Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions src/examples/multi_metric_evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
Simple example demonstrating multi-metric evaluation.

This toy evaluator checks multiple aspects of a response:
1. Length check (is response long enough?)
2. Keyword check (does it contain expected keywords?)
3. Sentiment check (is it positive?)

Each check produces its own metric, and they're aggregated into a final score.
"""

from strands_evals import Case, Dataset
from strands_evals.evaluators import Evaluator
from strands_evals.types import EvaluationData, EvaluationOutput


class MultiAspectEvaluator(Evaluator[str, str]):
"""Evaluates multiple aspects of a text response."""

def __init__(self, min_length: int = 10, required_keywords: list[str] | None = None):
super().__init__()
self.min_length = min_length
self.required_keywords = required_keywords or []

def evaluate(self, evaluation_case: EvaluationData[str, str]) -> list[EvaluationOutput]:
"""Returns one EvaluationOutput per aspect checked."""
actual = evaluation_case.actual_output or ""
results = []

# Metric 1: Length check
length_ok = len(actual) >= self.min_length
results.append(
EvaluationOutput(
score=1.0 if length_ok else 0.0,
test_pass=length_ok,
reason=f"Length: {len(actual)} chars ({'✓' if length_ok else '✗'} min {self.min_length})",
)
)

# Metric 2: Keyword check
keywords_found = [kw for kw in self.required_keywords if kw.lower() in actual.lower()]
keyword_score = len(keywords_found) / len(self.required_keywords) if self.required_keywords else 1.0
results.append(
EvaluationOutput(
score=keyword_score,
test_pass=keyword_score >= 0.5,
reason=f"Keywords: {len(keywords_found)}/{len(self.required_keywords)} found {keywords_found}",
)
)

# Metric 3: Sentiment check (simple heuristic)
positive_words = ["good", "great", "excellent", "happy", "wonderful"]
has_positive = any(word in actual.lower() for word in positive_words)
results.append(
EvaluationOutput(
score=1.0 if has_positive else 0.5,
test_pass=True, # Always pass, just informational
reason=f"Sentiment: {'Positive' if has_positive else 'Neutral'}",
)
)

return results

async def evaluate_async(self, evaluation_case: EvaluationData[str, str]) -> list[EvaluationOutput]:
# For this simple example, just call sync version
return self.evaluate(evaluation_case)


# Task function: simple echo with modifications
def simple_task(query: str) -> str:
return f"This is a great response to: {query}. It provides excellent information!"


if __name__ == "__main__":
# Create test cases
test_cases = [
Case[str, str](
name="short-response",
input="Hi",
expected_output="Short reply",
),
Case[str, str](
name="long-response",
input="Tell me about Python",
expected_output="Python is great",
),
]

# Create evaluator that checks: length >= 20, contains ["response", "information"]
evaluator = MultiAspectEvaluator(min_length=20, required_keywords=["response", "information"])

# Create dataset and run
dataset = Dataset[str, str](cases=test_cases, evaluator=evaluator)
report = dataset.run_evaluations(simple_task)

# Show results
print("\n=== Programmatic Access to Detailed Results ===")
for i, detailed in enumerate(report.detailed_results):
print(f"\nCase {i}: {report.cases[i]['name']}")
print(f" Aggregate Score: {report.scores[i]:.2f}")
print(f" Individual Metrics ({len(detailed)}):")
for j, metric in enumerate(detailed):
print(f" {j + 1}. Score={metric.score:.2f}, Pass={metric.test_pass}, Reason={metric.reason}")

# Interactive display
print("\n=== Interactive Display ===")
report.run_display()
42 changes: 28 additions & 14 deletions src/strands_evals/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,22 +240,26 @@ async def _worker(self, queue: asyncio.Queue, task: Callable, results: list):
"gen_ai.eval.case.name": case_name,
},
) as eval_span:
evaluation_output = await self.evaluator.evaluate_async(evaluation_context)
evaluation_outputs = await self.evaluator.evaluate_async(evaluation_context)
(aggregate_score, aggregate_pass, aggregate_reason) = self.evaluator.aggregator(
evaluation_outputs
)
eval_span.set_attributes(
{
"gen_ai.eval.output.score": evaluation_output.score,
"gen_ai.eval.output.test_pass": evaluation_output.test_pass,
"gen_ai.eval.output.reason": evaluation_output.reason or "",
"gen_ai.eval.output.score": aggregate_score,
"gen_ai.eval.output.test_pass": aggregate_pass,
"gen_ai.eval.output.reason": aggregate_reason or "",
}
)

# Store results
results.append(
{
"case": evaluation_context.model_dump(),
"test_pass": evaluation_output.test_pass,
"score": evaluation_output.score,
"reason": evaluation_output.reason or "",
"test_pass": aggregate_pass,
"score": aggregate_score,
"reason": aggregate_reason or "",
"detailed_results": evaluation_outputs,
}
)

Expand All @@ -267,6 +271,7 @@ async def _worker(self, queue: asyncio.Queue, task: Callable, results: list):
"test_pass": False,
"score": 0,
"reason": f"An error occurred: {str(e)}",
"detailed_results": [],
}
)
finally:
Expand All @@ -288,6 +293,7 @@ def run_evaluations(self, task: Callable[[InputT], OutputT | dict[str, Any]]) ->
test_passes = []
cases: list = []
reasons = []
detailed_results = []

for case in self._cases:
case_name = case.name or f"case_{len(cases)}"
Expand Down Expand Up @@ -330,33 +336,39 @@ def run_evaluations(self, task: Callable[[InputT], OutputT | dict[str, Any]]) ->
"gen_ai.eval.case.name": case_name,
},
) as eval_span:
evaluation_output = self.evaluator.evaluate(evaluation_context)
evaluation_outputs = self.evaluator.evaluate(evaluation_context)
(aggregate_score, aggregate_pass, aggregate_reason) = self.evaluator.aggregator(
evaluation_outputs
)
eval_span.set_attributes(
{
"gen_ai.eval.output.score": evaluation_output.score,
"gen_ai.eval.output.test_pass": evaluation_output.test_pass,
"gen_ai.eval.output.reason": evaluation_output.reason or "",
"gen_ai.eval.output.score": aggregate_score,
"gen_ai.eval.output.test_pass": aggregate_pass,
"gen_ai.eval.output.reason": aggregate_reason or "",
}
)

cases.append(evaluation_context.model_dump())
test_passes.append(evaluation_output.test_pass)
scores.append(evaluation_output.score)
reasons.append(evaluation_output.reason or "")
test_passes.append(aggregate_pass)
scores.append(aggregate_score)
reasons.append(aggregate_reason or "")
detailed_results.append(evaluation_outputs)

except Exception as e:
case_span.record_exception(e)
cases.append(case.model_dump())
test_passes.append(False)
scores.append(0)
reasons.append(f"An error occured : {str(e)}")
detailed_results.append([])

report = EvaluationReport(
overall_score=sum(scores) / len(scores) if len(scores) else 0,
scores=scores,
test_passes=test_passes,
cases=cases,
reasons=reasons,
detailed_results=detailed_results,
)

return report
Expand Down Expand Up @@ -395,6 +407,7 @@ async def run_evaluations_async(self, task: Callable, max_workers: int = 10) ->
test_passes = [r["test_pass"] for r in results]
cases = [r["case"] for r in results]
reasons = [r["reason"] for r in results]
detailed_results = [r["detailed_results"] for r in results]

# Create and return report
return EvaluationReport(
Expand All @@ -403,6 +416,7 @@ async def run_evaluations_async(self, task: Callable, max_workers: int = 10) ->
test_passes=test_passes,
cases=cases,
reasons=reasons,
detailed_results=detailed_results,
)

def to_dict(self) -> dict:
Expand Down
18 changes: 17 additions & 1 deletion src/strands_evals/display/display_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from rich.panel import Panel
from rich.prompt import Prompt
from rich.table import Table
from rich.tree import Tree

console = Console()

Expand All @@ -28,7 +29,8 @@ class CollapsibleTableReportDisplay:
"test_pass: bool,
"reason": str,
... # will display everything that's given like actual_output etc.
}
},
"detailed_results": list[EvaluationOutput],
},
"expanded": bool
}
Expand Down Expand Up @@ -93,8 +95,22 @@ def display_items(self):
pass_status,
] + len(other_fields) * ["..."]
table.add_row(*renderables)

console.print(table)

for key, item in self.items.items():
if item["expanded"] and item.get("detailed_results"):
detailed_results = item["detailed_results"]
if len(detailed_results) > 1: # Only show if multiple metrics
tree = Tree(f"[bold cyan]📋 Detailed Metrics for Case {key}[/bold cyan]")
for i, result in enumerate(detailed_results):
status = "✅" if result.test_pass else "❌"
metric_node = tree.add(f"[yellow]Metric {i + 1}[/yellow]: Score={result.score:.2f} {status}")
if result.reason:
metric_node.add(f"[dim]{result.reason}[/dim]")
console.print(tree)
console.print()

def run(self, static: bool = False):
"""
Run the interactive display loop. If static, then the terminal will only display the report.
Expand Down
17 changes: 14 additions & 3 deletions src/strands_evals/evaluators/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,17 @@ class Evaluator(Generic[InputT, OutputT]):
# Optional: subclasses can set this to enable trace parsing
evaluation_level: EvaluationLevel | None = None

def evaluate(self, evaluation_case: EvaluationData[InputT, OutputT]) -> EvaluationOutput:
def __init__(self):
self.aggregator = self._default_aggregator

@staticmethod
def _default_aggregator(outputs: list[EvaluationOutput]) -> tuple[float, bool, str]:
avg_score = sum(o.score for o in outputs) / len(outputs)
all_pass = all(o.test_pass for o in outputs)
combined_reason = " | ".join(o.reason for o in outputs if o.reason)
return avg_score, all_pass, combined_reason

def evaluate(self, evaluation_case: EvaluationData[InputT, OutputT]) -> list[EvaluationOutput]:
"""
Evaluate the performance of the task on the given test cases.

Expand All @@ -47,7 +57,7 @@ def evaluate(self, evaluation_case: EvaluationData[InputT, OutputT]) -> Evaluati
"""
raise NotImplementedError("This method should be implemented in subclasses.")

async def evaluate_async(self, evaluation_case: EvaluationData[InputT, OutputT]) -> EvaluationOutput:
async def evaluate_async(self, evaluation_case: EvaluationData[InputT, OutputT]) -> list[EvaluationOutput]:
"""
Evaluate the performance of the task on the given test cases asynchronously.

Expand Down Expand Up @@ -193,7 +203,8 @@ def to_dict(self) -> dict:
# Get default values from __init__ signature
sig = inspect.signature(self.__class__.__init__)
defaults = {k: v.default for k, v in sig.parameters.items() if v.default != inspect.Parameter.empty}
exclude_attrs = {"aggregator"}
for k, v in self.__dict__.items():
if not k.startswith("_") and (k not in defaults or v != defaults[k]):
if not k.startswith("_") and k not in exclude_attrs and (k not in defaults or v != defaults[k]):
_dict[k] = v
return _dict
10 changes: 6 additions & 4 deletions src/strands_evals/evaluators/helpfulness_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,23 @@ def __init__(
self.model = model
self.include_inputs = include_inputs

def evaluate(self, evaluation_case: EvaluationData[InputT, OutputT]) -> EvaluationOutput:
def evaluate(self, evaluation_case: EvaluationData[InputT, OutputT]) -> list[EvaluationOutput]:
parsed_input = self._get_last_turn(evaluation_case)
prompt = self._format_prompt(parsed_input)
evaluator_agent = Agent(model=self.model, system_prompt=self.system_prompt, callback_handler=None)
rating = evaluator_agent.structured_output(HelpfulnessRating, prompt)
normalized_score = self._score_mapping[rating.score]
return EvaluationOutput(score=normalized_score, test_pass=normalized_score >= 0.5, reason=rating.reasoning)
result = EvaluationOutput(score=normalized_score, test_pass=normalized_score >= 0.5, reason=rating.reasoning)
return [result]

async def evaluate_async(self, evaluation_case: EvaluationData[InputT, OutputT]) -> EvaluationOutput:
async def evaluate_async(self, evaluation_case: EvaluationData[InputT, OutputT]) -> list[EvaluationOutput]:
parsed_input = self._get_last_turn(evaluation_case)
prompt = self._format_prompt(parsed_input)
evaluator_agent = Agent(model=self.model, system_prompt=self.system_prompt, callback_handler=None)
rating = await evaluator_agent.structured_output_async(HelpfulnessRating, prompt)
normalized_score = self._score_mapping[rating.score]
return EvaluationOutput(score=normalized_score, test_pass=normalized_score >= 0.5, reason=rating.reasoning)
result = EvaluationOutput(score=normalized_score, test_pass=normalized_score >= 0.5, reason=rating.reasoning)
return [result]

def _get_last_turn(self, evaluation_case: EvaluationData[InputT, OutputT]) -> TurnLevelInput:
"""Extract the most recent turn from the conversation for evaluation."""
Expand Down
Loading