feat: Add EvalHub integration#60
feat: Add EvalHub integration#60ruivieira wants to merge 1 commit intotrustyai-explainability:mainfrom
Conversation
- Updated Containerfile to use UBI Python image for better compatibility and to avoid Docker Hub rate limits. - Modified pip install command to include 'evalhub' extra dependencies. - Added EvalHub adapter scripts and classes, enabling the package to serve as a standalone module for RAGAS evaluation jobs. - Updated README to document the new EvalHub adapter functionality. - Introduced new files for embeddings and LLM wrappers compatible with OpenAI for the EvalHub integration.
Reviewer's GuideAdds an EvalHub-specific RAGAS adapter so the same image can run both Llama Stack RAGAS provider (KFP) and EvalHub jobs, including new OpenAI-compatible LLM/embeddings wrappers, a CLI entrypoint, and UBI-based container adjustments plus README/docs and extras wiring. Sequence diagram for EvalHub RAGAS benchmark job executionsequenceDiagram
actor EvalHubUser
participant EvalHubController
participant Kubernetes
participant RagasEvalHubAdapter as RagasEvalHubAdapter(main)
participant DefaultCallbacks
participant RagasCore as RAGAS_evaluate
participant EvalHubOpenAILLM
participant EvalHubOpenAIEmbeddings
participant OpenAIModelAPI as OpenAI_Compat_LLM
participant OpenAIEmbedAPI as OpenAI_Compat_Embeddings
EvalHubUser->>EvalHubController: Configure benchmark and model
EvalHubController->>Kubernetes: Create Job with image and entrypoint ragas-evalhub-adapter
Kubernetes->>RagasEvalHubAdapter: Start container and run main()
RagasEvalHubAdapter->>RagasEvalHubAdapter: Load JobSpec from /meta/job.json
RagasEvalHubAdapter->>DefaultCallbacks: Initialize with job_id, callback_url, oci_auth
RagasEvalHubAdapter->>DefaultCallbacks: report_status(INITIALIZING)
RagasEvalHubAdapter->>RagasEvalHubAdapter: _validate_config(JobSpec)
RagasEvalHubAdapter->>RagasEvalHubAdapter: _resolve_data_path(JobSpec)
RagasEvalHubAdapter->>RagasEvalHubAdapter: _load_dataset(Path)
RagasEvalHubAdapter->>RagasEvalHubAdapter: _apply_column_map / _limit_records
RagasEvalHubAdapter->>DefaultCallbacks: report_status(LOADING_DATA)
RagasEvalHubAdapter->>RagasCore: ragas_evaluate(dataset, metrics, llm, embeddings, run_config)
activate RagasCore
RagasCore->>EvalHubOpenAILLM: generate_text(prompt)
EvalHubOpenAILLM->>OpenAIModelAPI: POST /v1/completions
OpenAIModelAPI-->>EvalHubOpenAILLM: completion text
EvalHubOpenAILLM-->>RagasCore: LLMResult
RagasCore->>EvalHubOpenAIEmbeddings: embed_query / embed_documents
EvalHubOpenAIEmbeddings->>OpenAIEmbedAPI: POST /v1/embeddings
OpenAIEmbedAPI-->>EvalHubOpenAIEmbeddings: embeddings
EvalHubOpenAIEmbeddings-->>RagasCore: vectors
RagasCore-->>RagasEvalHubAdapter: ragas_result
deactivate RagasCore
RagasEvalHubAdapter->>DefaultCallbacks: report_status(POST_PROCESSING)
RagasEvalHubAdapter->>RagasEvalHubAdapter: Aggregate metrics, build EvaluationResult list
alt OCI export configured
RagasEvalHubAdapter->>DefaultCallbacks: create_oci_artifact(OCIArtifactSpec)
DefaultCallbacks-->>RagasEvalHubAdapter: OCI artifact reference
end
RagasEvalHubAdapter-->>EvalHubController: JobResults (overall_score, metrics)
RagasEvalHubAdapter->>DefaultCallbacks: report_results(JobResults)
RagasEvalHubAdapter->>Kubernetes: Exit code 0
Kubernetes-->>EvalHubController: Job completed
Class diagram for EvalHub adapter, LLM, and embeddings integrationclassDiagram
class FrameworkAdapter {
<<external>>
+JobSpec job_spec
+run_benchmark_job(config, callbacks) JobResults
}
class JobSpec {
<<external>>
+str id
+str benchmark_id
+int benchmark_index
+ModelSpec model
+dict benchmark_config
+ExportsConfig exports
+str provider_id
+str callback_url
+int num_examples
}
class JobCallbacks {
<<external>>
+report_status(update)
+create_oci_artifact(spec) OCIArtifactRef
+report_results(results)
}
class RagasEvalHubAdapter {
+run_benchmark_job(config, callbacks) JobResults
-_validate_config(config) void
-_resolve_data_path(config) Path
-_load_dataset(path) list~dict~
-_apply_column_map(records, column_map) list~dict~
-_limit_records(records, num_examples) list~dict~
}
class EvaluationDataset {
<<external>>
+from_list(records) EvaluationDataset
}
class EvaluationResult {
<<external>>
+str metric_name
+float metric_value
+str metric_type
+int num_samples
+dict metadata
}
class RunConfig {
<<external>>
+int max_workers
}
class BaseRagasLLM {
<<external>>
+BaseRagasLLM(run_config, multiple_completion_supported)
+generate_text(prompt, n, temperature, stop, callbacks) LLMResult
+agenerate_text(prompt, n, temperature, stop, callbacks) LLMResult
}
class EvalHubOpenAILLM {
-str _base_url
-str _model_id
-int _max_tokens
-float _temperature
+EvalHubOpenAILLM(base_url, model_id, max_tokens, temperature, run_config)
+generate_text(prompt, n, temperature, stop, callbacks) LLMResult
+agenerate_text(prompt, n, temperature, stop, callbacks) LLMResult
+get_temperature(n) float
-_client() Any
}
class BaseRagasEmbeddings {
<<external>>
+set_run_config(run_config) void
+embed_query(text) list~float~
+embed_documents(texts) list~list~float~~
}
class EvalHubOpenAIEmbeddings {
-str _base_url
-str _model_id
+EvalHubOpenAIEmbeddings(base_url, model_id, run_config)
+embed_query(text) list~float~
+embed_documents(texts) list~list~float~~
+aembed_query(text) list~float~
+aembed_documents(texts) list~list~float~~
-_client() Any
-_validate_embedding(embedding) list~float~
}
class METRIC_MAPPING {
<<module>>
+dict~str, Metric~
}
class OpenAI {
<<external>>
+completions
+embeddings
}
FrameworkAdapter <|-- RagasEvalHubAdapter
JobSpec --> ModelSpec : uses
RagasEvalHubAdapter --> JobSpec : consumes
RagasEvalHubAdapter --> JobCallbacks : uses
RagasEvalHubAdapter --> EvaluationDataset : builds
RagasEvalHubAdapter --> EvaluationResult : aggregates
RagasEvalHubAdapter --> RunConfig : configures
RagasEvalHubAdapter --> EvalHubOpenAILLM : constructs
RagasEvalHubAdapter --> EvalHubOpenAIEmbeddings : constructs
RagasEvalHubAdapter --> METRIC_MAPPING : selects metrics
BaseRagasLLM <|-- EvalHubOpenAILLM
EvalHubOpenAILLM --> OpenAI : calls
BaseRagasEmbeddings <|-- EvalHubOpenAIEmbeddings
EvalHubOpenAIEmbeddings --> OpenAI : calls
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- The
metric_nameshandling inRagasEvalHubAdapter.run_benchmark_jobsilently drops unknown metric names; consider logging a warning or raising if requested metrics are not found inMETRIC_MAPPINGso misconfigurations are easier to detect. - The
_get_api_keyhelper is duplicated in bothevalhub.llmandevalhub.embeddings; consider moving this into a shared utility module to avoid divergence in future changes. - The data directories
/test_dataand/data(andDEFAULT_DATASET_FILENAME) are currently hardcoded in the adapter; consider allowing these to be overridden via environment variables or benchmark config to make the adapter more flexible across environments.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `metric_names` handling in `RagasEvalHubAdapter.run_benchmark_job` silently drops unknown metric names; consider logging a warning or raising if requested metrics are not found in `METRIC_MAPPING` so misconfigurations are easier to detect.
- The `_get_api_key` helper is duplicated in both `evalhub.llm` and `evalhub.embeddings`; consider moving this into a shared utility module to avoid divergence in future changes.
- The data directories `/test_data` and `/data` (and `DEFAULT_DATASET_FILENAME`) are currently hardcoded in the adapter; consider allowing these to be overridden via environment variables or benchmark config to make the adapter more flexible across environments.
## Individual Comments
### Comment 1
<location path="src/llama_stack_provider_ragas/evalhub/adapter.py" line_range="149" />
<code_context>
+ column_map = bc.get("column_map")
+ if isinstance(column_map, dict):
+ records = _apply_column_map(records, column_map)
+ records = _limit_records(records, config.num_examples)
+ if not records:
+ raise ValueError(f"No records in dataset at {data_path} (or after limit)")
</code_context>
<issue_to_address>
**suggestion (performance):** Limit is applied after loading full dataset; for large JSONL this can be unnecessarily expensive.
For `.jsonl`, consider passing `config.num_examples` into `_load_dataset` and stopping iteration once that many records are read, instead of loading all rows first. You can special-case this for `.jsonl` while keeping the current behavior for `.json`, where partial reads are trickier.
Suggested implementation:
```python
data_path = _resolve_data_path(config)
# For large JSONL files, avoid loading the full dataset when a limit is set.
# We pass the limit through to the loader so it can stop iterating early.
num_examples = getattr(config, "num_examples", None)
if str(data_path).endswith(".jsonl") and num_examples:
records = _load_dataset(data_path, limit=num_examples)
else:
records = _load_dataset(data_path)
column_map = bc.get("column_map")
if isinstance(column_map, dict):
records = _apply_column_map(records, column_map)
# Keep in-memory limiting as a safety net, including for non-JSONL formats.
records = _limit_records(records, num_examples)
```
To fully implement the optimization, you will also need to update the `_load_dataset` implementation to accept and use the new `limit` parameter for `.jsonl`:
1. Update the `_load_dataset` function signature to accept an optional `limit`:
- From: `def _load_dataset(path: Union[str, Path]) -> List[Dict[str, Any]]:`
- To: `def _load_dataset(path: Union[str, Path], limit: Optional[int] = None) -> List[Dict[str, Any]]:`
2. Inside `_load_dataset`, special-case `.jsonl`:
- When `path` ends with `.jsonl` and `limit` is not `None`, iterate over the file line by line, `json.loads` each line, append to `records`, and `break` once `len(records) >= limit`.
- For `.json` or other formats, keep the current behavior of loading the entire file; ignore `limit` in those cases.
3. Ensure all other call sites of `_load_dataset` in the codebase are updated (or left as-is) so that they either:
- Continue calling `_load_dataset(path)` with no `limit`, or
- Explicitly pass `limit=` if they want streaming/early stop behavior for `.jsonl`.
</issue_to_address>
### Comment 2
<location path="src/llama_stack_provider_ragas/evalhub/adapter.py" line_range="160-164" />
<code_context>
+ list(records[0].keys()) if records else [],
+ )
+
+ metric_names = bc.get("metrics") or bc.get("scoring_functions") or list(METRIC_MAPPING.keys())
+ metrics = [METRIC_MAPPING[name] for name in metric_names if name in METRIC_MAPPING]
+ if not metrics:
+ metrics = list(METRIC_MAPPING.values())
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Silently dropping unknown metric names can make configuration issues hard to diagnose.
Right now, any metric name not in `METRIC_MAPPING` is silently skipped and the run proceeds with the remaining metrics (or all metrics if none match). Consider either logging a warning listing the unknown names or raising a configuration error when they’re present, so misconfigured `metrics` / `scoring_functions` are easier to detect.
```suggestion
metric_names = bc.get("metrics") or bc.get("scoring_functions") or list(METRIC_MAPPING.keys())
unknown_metric_names = [name for name in metric_names if name not in METRIC_MAPPING]
if unknown_metric_names:
logger.warning(
"Unknown metric names in configuration: %s. These will be ignored. Known metrics: %s",
unknown_metric_names,
list(METRIC_MAPPING.keys()),
)
metrics = [METRIC_MAPPING[name] for name in metric_names if name in METRIC_MAPPING]
if not metrics:
metrics = list(METRIC_MAPPING.values())
if metric_names:
logger.warning(
"No valid metric names found in configuration (requested: %s). "
"Falling back to default RAGAS metrics.",
metric_names,
)
logger.info("Using default RAGAS metrics")
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| column_map = bc.get("column_map") | ||
| if isinstance(column_map, dict): | ||
| records = _apply_column_map(records, column_map) | ||
| records = _limit_records(records, config.num_examples) |
There was a problem hiding this comment.
suggestion (performance): Limit is applied after loading full dataset; for large JSONL this can be unnecessarily expensive.
For .jsonl, consider passing config.num_examples into _load_dataset and stopping iteration once that many records are read, instead of loading all rows first. You can special-case this for .jsonl while keeping the current behavior for .json, where partial reads are trickier.
Suggested implementation:
data_path = _resolve_data_path(config)
# For large JSONL files, avoid loading the full dataset when a limit is set.
# We pass the limit through to the loader so it can stop iterating early.
num_examples = getattr(config, "num_examples", None)
if str(data_path).endswith(".jsonl") and num_examples:
records = _load_dataset(data_path, limit=num_examples)
else:
records = _load_dataset(data_path)
column_map = bc.get("column_map")
if isinstance(column_map, dict):
records = _apply_column_map(records, column_map)
# Keep in-memory limiting as a safety net, including for non-JSONL formats.
records = _limit_records(records, num_examples)To fully implement the optimization, you will also need to update the _load_dataset implementation to accept and use the new limit parameter for .jsonl:
-
Update the
_load_datasetfunction signature to accept an optionallimit:- From:
def _load_dataset(path: Union[str, Path]) -> List[Dict[str, Any]]: - To:
def _load_dataset(path: Union[str, Path], limit: Optional[int] = None) -> List[Dict[str, Any]]:
- From:
-
Inside
_load_dataset, special-case.jsonl:- When
pathends with.jsonlandlimitis notNone, iterate over the file line by line,json.loadseach line, append torecords, andbreakoncelen(records) >= limit. - For
.jsonor other formats, keep the current behavior of loading the entire file; ignorelimitin those cases.
- When
-
Ensure all other call sites of
_load_datasetin the codebase are updated (or left as-is) so that they either:- Continue calling
_load_dataset(path)with nolimit, or - Explicitly pass
limit=if they want streaming/early stop behavior for.jsonl.
- Continue calling
| metric_names = bc.get("metrics") or bc.get("scoring_functions") or list(METRIC_MAPPING.keys()) | ||
| metrics = [METRIC_MAPPING[name] for name in metric_names if name in METRIC_MAPPING] | ||
| if not metrics: | ||
| metrics = list(METRIC_MAPPING.values()) | ||
| logger.info("Using default RAGAS metrics") |
There was a problem hiding this comment.
suggestion (bug_risk): Silently dropping unknown metric names can make configuration issues hard to diagnose.
Right now, any metric name not in METRIC_MAPPING is silently skipped and the run proceeds with the remaining metrics (or all metrics if none match). Consider either logging a warning listing the unknown names or raising a configuration error when they’re present, so misconfigured metrics / scoring_functions are easier to detect.
| metric_names = bc.get("metrics") or bc.get("scoring_functions") or list(METRIC_MAPPING.keys()) | |
| metrics = [METRIC_MAPPING[name] for name in metric_names if name in METRIC_MAPPING] | |
| if not metrics: | |
| metrics = list(METRIC_MAPPING.values()) | |
| logger.info("Using default RAGAS metrics") | |
| metric_names = bc.get("metrics") or bc.get("scoring_functions") or list(METRIC_MAPPING.keys()) | |
| unknown_metric_names = [name for name in metric_names if name not in METRIC_MAPPING] | |
| if unknown_metric_names: | |
| logger.warning( | |
| "Unknown metric names in configuration: %s. These will be ignored. Known metrics: %s", | |
| unknown_metric_names, | |
| list(METRIC_MAPPING.keys()), | |
| ) | |
| metrics = [METRIC_MAPPING[name] for name in metric_names if name in METRIC_MAPPING] | |
| if not metrics: | |
| metrics = list(METRIC_MAPPING.values()) | |
| if metric_names: | |
| logger.warning( | |
| "No valid metric names found in configuration (requested: %s). " | |
| "Falling back to default RAGAS metrics.", | |
| metric_names, | |
| ) | |
| logger.info("Using default RAGAS metrics") |
| provider_id=adapter.job_spec.provider_id, | ||
| sidecar_url=adapter.job_spec.callback_url, | ||
| oci_auth_config_path=Path(oci_auth) if oci_auth else None, | ||
| oci_insecure=os.environ.get("OCI_REGISTRY_INSECURE", "false").lower() == "true", |
There was a problem hiding this comment.
The default env variable name in eval-hub-sdk's AdaptorSettings is OCI_INSECURE , OCI_REGISTRY_INSECURE
# OCI registry configuration
oci_auth_config_path: Path | None = Field(
default=None, validation_alias="OCI_AUTH_CONFIG_PATH"
)
oci_insecure: bool = Field(default=False, validation_alias="OCI_INSECURE")
|
@ruivieira thank you for this! i think you already hinted at this earlier, but it seems this pr is purely additive and parallel in the sense that it's creating a new adapter that only requires ragas and nothing from this provider. like you said we should move to eval-hub contrib. happy to post a pr! let me know. |
trustyai-explainability/trustyai-service-operator#664 depends on this PR
Summary by Sourcery
Add an EvalHub-compatible RAGAS adapter so the project’s container image can be used both as a Llama Stack RAGAS provider and as an EvalHub RAGAS evaluation job entrypoint.
New Features:
ragas-evalhub-adapterconsole entrypoint that runs RAGAS evaluations as an EvalHub framework adapter using EvalHub job specs.Enhancements:
evalhubextra alongsideremotedependencies.Documentation: