diff --git a/Containerfile b/Containerfile index ad0303c6..d6ec768f 100644 --- a/Containerfile +++ b/Containerfile @@ -1,8 +1,12 @@ -# Edited from the KFP generated Dockerfile. -FROM python:3.12 +# Single image for both Llama Stack RAGAS provider (KFP) and EvalHub RAGAS adapter. +# Install [remote] for Kubeflow Pipelines; [evalhub] for EvalHub job entrypoint (ragas-evalhub-adapter). +# Use UBI Python to avoid Docker Hub rate limits and for OpenShift compatibility. +FROM registry.access.redhat.com/ubi9/python-312:latest + +USER 0 WORKDIR /usr/local/src/kfp/components COPY . . -RUN pip install --no-cache-dir -e ".[remote]" +RUN pip install --no-cache-dir -e ".[remote,evalhub]" diff --git a/README.md b/README.md index 1552bae9..aa3cca39 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ There are two versions of the provider: - `inline`: runs the Ragas evaluation in the same process as the Llama Stack server. This is always available with the base installation. - `remote`: runs the Ragas evaluation in a remote process, using Kubeflow Pipelines. Only available when remote dependencies are installed with `pip install llama-stack-provider-ragas[remote]`. +**EvalHub adapter** (standalone module, same container): the package also provides an EvalHub framework adapter in `llama_stack_provider_ragas.evalhub`. It is not part of the Llama Stack provider; EvalHub invokes it as the Job entrypoint (`ragas-evalhub-adapter`). Install with `pip install llama-stack-provider-ragas[evalhub]`. The same container image built from this repo can serve both the Llama Stack provider and EvalHub RAGAS jobs. + ## Prerequisites - Python 3.12 - [uv](https://docs.astral.sh/uv/) diff --git a/pyproject.toml b/pyproject.toml index 8a6f5f5f..3400c645 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,9 @@ dependencies = [ "llama-stack-client>=0.5.0", ] +[project.scripts] +ragas-evalhub-adapter = "llama_stack_provider_ragas.evalhub.adapter:main" + [project.urls] homepage = "https://github.com/trustyai-explainability/llama-stack-provider-ragas" repository = "https://github.com/trustyai-explainability/llama-stack-provider-ragas" @@ -43,6 +46,7 @@ repository = "https://github.com/trustyai-explainability/llama-stack-provider-ra [project.optional-dependencies] remote = ["kfp>=2.5.0", "kfp-kubernetes>=2.0.0", "kfp-pipeline-spec>=2.0.0", "kfp-server-api>=2.0.0", "s3fs>=2024.12.0", "kubernetes>=30.0.0"] distro = ["opentelemetry-api", "opentelemetry-exporter-otlp", "aiosqlite", "ollama", "uvicorn"] +evalhub = ["eval-hub-sdk[adapter]>=0.1.0", "openai>=1.0.0"] dev = [ "llama-stack-provider-ragas[distro]", "llama-stack-provider-ragas[remote]", diff --git a/src/llama_stack_provider_ragas/evalhub/__init__.py b/src/llama_stack_provider_ragas/evalhub/__init__.py new file mode 100644 index 00000000..a9c0aa41 --- /dev/null +++ b/src/llama_stack_provider_ragas/evalhub/__init__.py @@ -0,0 +1,10 @@ +"""EvalHub RAGAS adapter — standalone module run as Job entrypoint. + +This package is part of llama-stack-provider-ragas but is not part of the +Llama Stack provider. It shares the container and dependencies (ragas, etc.) +and is invoked by EvalHub when running RAGAS evaluation jobs. +""" + +from .adapter import main + +__all__ = ["main"] diff --git a/src/llama_stack_provider_ragas/evalhub/adapter.py b/src/llama_stack_provider_ragas/evalhub/adapter.py new file mode 100644 index 00000000..2daf710d --- /dev/null +++ b/src/llama_stack_provider_ragas/evalhub/adapter.py @@ -0,0 +1,343 @@ +"""RAGAS framework adapter for EvalHub. + +Reads the evaluation dataset from /test_data (when populated by S3 init container) +or /data and runs RAGAS metrics against the model in the job spec. +Uses eval-hub-sdk (installed with the evalhub extra). +""" + +from __future__ import annotations + +import json +import logging +import time +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +from ragas import EvaluationDataset, evaluate as ragas_evaluate +from ragas.run_config import RunConfig + +from evalhub.adapter import ( + FrameworkAdapter, + JobCallbacks, + JobPhase, + JobResults, + JobSpec, + JobStatus, + JobStatusUpdate, + OCIArtifactSpec, +) +from evalhub.adapter.models.job import ErrorInfo, MessageInfo +from evalhub.models.api import EvaluationResult + +from llama_stack_provider_ragas.constants import METRIC_MAPPING + +from .embeddings import EvalHubOpenAIEmbeddings +from .llm import EvalHubOpenAILLM + +logger = logging.getLogger(__name__) + +# When test_data_ref.s3 is set, EvalHub's init container downloads objects here +TEST_DATA_DIR = Path("/test_data") +DEFAULT_DATA_DIR = Path("/data") +DEFAULT_DATASET_FILENAME = "dataset.jsonl" +_DATA_SUFFIXES = (".jsonl", ".json") + + +def _first_dataset_in_dir(path: Path) -> Path | None: + """Return the first dataset file (.jsonl or .json) in path, or None.""" + if not path.exists() or not path.is_dir(): + return None + for f in sorted(path.iterdir()): + if f.suffix.lower() in _DATA_SUFFIXES and f.is_file(): + return f + return None + + +def _resolve_data_path(config: JobSpec) -> Path: + bc = config.benchmark_config or {} + explicit = bc.get("data_path") + if explicit: + p = Path(explicit) + if p.is_absolute(): + return p + return DEFAULT_DATA_DIR / explicit + # Prefer /test_data when populated by S3 init container (custom data) + test_data_file = TEST_DATA_DIR / DEFAULT_DATASET_FILENAME + if test_data_file.exists(): + return test_data_file + first_in_test = _first_dataset_in_dir(TEST_DATA_DIR) + if first_in_test is not None: + return first_in_test + # Fall back to /data (e.g. when no test_data_ref) + default_file = DEFAULT_DATA_DIR / DEFAULT_DATASET_FILENAME + if default_file.exists(): + return default_file + first_in_data = _first_dataset_in_dir(DEFAULT_DATA_DIR) + if first_in_data is not None: + return first_in_data + return default_file + + +def _load_dataset(path: Path) -> list[dict[str, Any]]: + if not path.exists(): + raise FileNotFoundError(f"Dataset not found: {path}") + suffix = path.suffix.lower() + with open(path) as f: + if suffix == ".jsonl": + return [json.loads(line) for line in f if line.strip()] + if suffix == ".json": + data = json.load(f) + if isinstance(data, list): + return data + if isinstance(data, dict) and "data" in data: + return data["data"] + raise ValueError(f"JSON dataset must be a list or {{'data': list}}, got {type(data)}") + raise ValueError(f"Unsupported dataset format: {path.suffix}") + + +def _apply_column_map(records: list[dict[str, Any]], column_map: dict[str, str] | None) -> list[dict[str, Any]]: + if not column_map: + return records + return [{column_map.get(k, k): v for k, v in row.items()} for row in records] + + +def _limit_records(records: list[dict[str, Any]], num_examples: int | None) -> list[dict[str, Any]]: + if num_examples is None or num_examples <= 0: + return records + return records[:num_examples] + + +class RagasEvalHubAdapter(FrameworkAdapter): + """EvalHub framework adapter that runs RAGAS evaluation using data from /test_data or /data.""" + + def run_benchmark_job(self, config: JobSpec, callbacks: JobCallbacks) -> JobResults: + start_time = time.time() + logger.info("Starting RAGAS EvalHub job %s for benchmark %s", config.id, config.benchmark_id) + + try: + callbacks.report_status( + JobStatusUpdate( + status=JobStatus.RUNNING, + phase=JobPhase.INITIALIZING, + progress=0.0, + message=MessageInfo( + message="Validating configuration and resolving data path", + message_code="initializing", + ), + ) + ) + self._validate_config(config) + bc = config.benchmark_config or {} + + callbacks.report_status( + JobStatusUpdate( + status=JobStatus.RUNNING, + phase=JobPhase.LOADING_DATA, + progress=0.15, + message=MessageInfo( + message="Loading dataset from /test_data or /data", + message_code="loading_data", + ), + ) + ) + data_path = _resolve_data_path(config) + records = _load_dataset(data_path) + column_map = bc.get("column_map") + if isinstance(column_map, dict): + records = _apply_column_map(records, column_map) + records = _limit_records(records, config.num_examples) + if not records: + raise ValueError(f"No records in dataset at {data_path} (or after limit)") + eval_dataset = EvaluationDataset.from_list(records) + logger.info( + "Dataset loaded: path=%s records=%d columns=%s", + data_path, + len(records), + list(records[0].keys()) if records else [], + ) + + metric_names = bc.get("metrics") or bc.get("scoring_functions") or list(METRIC_MAPPING.keys()) + metrics = [METRIC_MAPPING[name] for name in metric_names if name in METRIC_MAPPING] + if not metrics: + metrics = list(METRIC_MAPPING.values()) + logger.info("Using default RAGAS metrics") + + model_url = config.model.url.strip().rstrip("/") + model_name = config.model.name + embedding_model = bc.get("embedding_model") or model_name + embedding_url = bc.get("embedding_url") or model_url + run_config = RunConfig(max_workers=1) + llm = EvalHubOpenAILLM( + base_url=model_url, + model_id=model_name, + max_tokens=bc.get("max_tokens"), + temperature=bc.get("temperature"), + run_config=run_config, + ) + embeddings = EvalHubOpenAIEmbeddings( + base_url=embedding_url, + model_id=embedding_model, + run_config=run_config, + ) + + callbacks.report_status( + JobStatusUpdate( + status=JobStatus.RUNNING, + phase=JobPhase.RUNNING_EVALUATION, + progress=0.3, + message=MessageInfo( + message=f"Running RAGAS evaluation ({len(metrics)} metrics)", + message_code="running_evaluation", + ), + ) + ) + ragas_result = ragas_evaluate( + dataset=eval_dataset, + metrics=metrics, + llm=llm, + embeddings=embeddings, + run_config=run_config, + ) + + callbacks.report_status( + JobStatusUpdate( + status=JobStatus.RUNNING, + phase=JobPhase.POST_PROCESSING, + progress=0.85, + message=MessageInfo( + message="Processing RAGAS results", + message_code="post_processing", + ), + ) + ) + result_df = ragas_result.to_pandas() + n_evaluated = len(result_df) + evaluation_results: list[EvaluationResult] = [] + scores_for_overall: list[float] = [] + for metric_name in [m.name for m in metrics]: + if metric_name not in result_df.columns: + continue + series = result_df[metric_name].dropna() + values = series.tolist() + if not values: + continue + avg = sum(values) / len(values) + scores_for_overall.append(avg) + evaluation_results.append( + EvaluationResult( + metric_name=metric_name, + metric_value=round(avg, 6), + metric_type="float", + num_samples=len(values), + metadata={"min": min(values), "max": max(values)}, + ) + ) + overall_score = sum(scores_for_overall) / len(scores_for_overall) if scores_for_overall else None + + oci_artifact = None + if config.exports and config.exports.oci: + callbacks.report_status( + JobStatusUpdate( + status=JobStatus.RUNNING, + phase=JobPhase.PERSISTING_ARTIFACTS, + progress=0.9, + message=MessageInfo( + message="Persisting results to OCI", + message_code="persisting_artifacts", + ), + ) + ) + results_dir = Path("/tmp/ragas_evalhub_results") / config.id + results_dir.mkdir(parents=True, exist_ok=True) + results_file = results_dir / "results.jsonl" + result_df.to_json(results_file, orient="records", lines=True) + oci_artifact = callbacks.create_oci_artifact( + OCIArtifactSpec( + files_path=results_dir, + coordinates=config.exports.oci.coordinates, + ) + ) + + duration = time.time() - start_time + return JobResults( + id=config.id, + benchmark_id=config.benchmark_id, + benchmark_index=config.benchmark_index, + model_name=config.model.name, + results=evaluation_results, + overall_score=overall_score, + num_examples_evaluated=n_evaluated, + duration_seconds=round(duration, 2), + completed_at=datetime.now(UTC), + evaluation_metadata={ + "framework": "ragas", + "data_path": str(data_path), + "metrics": [m.name for m in metrics], + }, + oci_artifact=oci_artifact, + ) + + except Exception as e: + logger.exception("RAGAS EvalHub job %s failed", config.id) + callbacks.report_status( + JobStatusUpdate( + status=JobStatus.FAILED, + error=ErrorInfo(message=str(e), message_code="job_failed"), + error_details={"exception_type": type(e).__name__}, + ) + ) + raise + + def _validate_config(self, config: JobSpec) -> None: + if not config.benchmark_id: + raise ValueError("benchmark_id is required") + if not config.model or not config.model.url: + raise ValueError("model.url is required") + if not config.model.name: + raise ValueError("model.name is required") + + +def main() -> None: + """Entry point for running the adapter as an EvalHub K8s Job.""" + import os + import sys + + from evalhub.adapter import DefaultCallbacks + + log_level = os.environ.get("LOG_LEVEL", "INFO").upper() + logging.basicConfig( + level=getattr(logging, log_level, logging.INFO), + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + + job_spec_path = os.environ.get("EVALHUB_JOB_SPEC_PATH", "/meta/job.json") + try: + adapter = RagasEvalHubAdapter(job_spec_path=job_spec_path) + logger.info("Loaded job %s", adapter.job_spec.id) + logger.info("Benchmark: %s", adapter.job_spec.benchmark_id) + + oci_auth = os.environ.get("OCI_AUTH_CONFIG_PATH") + callbacks = DefaultCallbacks( + job_id=adapter.job_spec.id, + benchmark_id=adapter.job_spec.benchmark_id, + benchmark_index=adapter.job_spec.benchmark_index, + provider_id=adapter.job_spec.provider_id, + sidecar_url=adapter.job_spec.callback_url, + oci_auth_config_path=Path(oci_auth) if oci_auth else None, + oci_insecure=os.environ.get("OCI_REGISTRY_INSECURE", "false").lower() == "true", + ) + + results = adapter.run_benchmark_job(adapter.job_spec, callbacks) + logger.info("Job completed: %s", results.id) + callbacks.report_results(results) + sys.exit(0) + except FileNotFoundError as e: + logger.error("Job spec not found: %s", e) + sys.exit(1) + except ValueError as e: + logger.error("Configuration error: %s", e) + sys.exit(1) + except Exception: + logger.exception("Job failed") + sys.exit(1) diff --git a/src/llama_stack_provider_ragas/evalhub/embeddings.py b/src/llama_stack_provider_ragas/evalhub/embeddings.py new file mode 100644 index 00000000..a05997f3 --- /dev/null +++ b/src/llama_stack_provider_ragas/evalhub/embeddings.py @@ -0,0 +1,85 @@ +"""OpenAI-compatible embeddings wrapper for EvalHub adapter.""" + +from __future__ import annotations + +import logging +import os +from typing import Any + +from ragas.embeddings.base import BaseRagasEmbeddings +from ragas.run_config import RunConfig + +logger = logging.getLogger(__name__) + +try: + from openai import OpenAI + _HAS_OPENAI = True +except ImportError: + _HAS_OPENAI = False + + +def _get_api_key() -> str: + return ( + os.environ.get("OPENAICOMPATIBLE_API_KEY") + or os.environ.get("OPENAI_API_KEY") + or "DUMMY" + ) + + +class EvalHubOpenAIEmbeddings(BaseRagasEmbeddings): + """RAGAS embeddings that call an OpenAI-compatible embeddings endpoint.""" + + def __init__( + self, + base_url: str, + model_id: str, + *, + run_config: RunConfig | None = None, + ): + super().__init__() + self._base_url = base_url.rstrip("/") + if not self._base_url.endswith("/v1"): + self._base_url = f"{self._base_url}/v1" + self._model_id = model_id + if run_config is None: + run_config = RunConfig() + self.set_run_config(run_config) + + def _client(self) -> Any: + if not _HAS_OPENAI: + raise RuntimeError( + "OpenAI package is required. Install with: pip install llama-stack-provider-ragas[evalhub]" + ) + return OpenAI(base_url=self._base_url, api_key=_get_api_key()) + + def _validate_embedding(self, embedding: list[float] | str) -> list[float]: + if isinstance(embedding, str): + raise ValueError("Expected float embeddings, got base64 string") + return embedding + + def embed_query(self, text: str) -> list[float]: + client = self._client() + try: + r = client.embeddings.create(input=text, model=self._model_id) + data = r.data + if not data: + raise ValueError("Embeddings response had no data") + return self._validate_embedding(data[0].embedding) + except Exception as e: + logger.error("Embed query failed: %s", e) + raise + + def embed_documents(self, texts: list[str]) -> list[list[float]]: + client = self._client() + try: + r = client.embeddings.create(input=texts, model=self._model_id) + return [self._validate_embedding(d.embedding) for d in r.data] + except Exception as e: + logger.error("Embed documents failed: %s", e) + raise + + async def aembed_query(self, text: str) -> list[float]: + return self.embed_query(text) + + async def aembed_documents(self, texts: list[str]) -> list[list[float]]: + return self.embed_documents(texts) diff --git a/src/llama_stack_provider_ragas/evalhub/llm.py b/src/llama_stack_provider_ragas/evalhub/llm.py new file mode 100644 index 00000000..d0809f33 --- /dev/null +++ b/src/llama_stack_provider_ragas/evalhub/llm.py @@ -0,0 +1,116 @@ +"""OpenAI-compatible LLM wrapper for EvalHub adapter.""" + +from __future__ import annotations + +import logging +import os +from typing import Any + +from langchain_core.language_models.llms import Generation, LLMResult +from langchain_core.prompt_values import PromptValue +from ragas.llms.base import BaseRagasLLM +from ragas.run_config import RunConfig + +logger = logging.getLogger(__name__) + +try: + from openai import OpenAI + _HAS_OPENAI = True +except ImportError: + _HAS_OPENAI = False + + +def _get_api_key() -> str: + return ( + os.environ.get("OPENAICOMPATIBLE_API_KEY") + or os.environ.get("OPENAI_API_KEY") + or "DUMMY" + ) + + +class EvalHubOpenAILLM(BaseRagasLLM): + """RAGAS LLM that calls an OpenAI-compatible completions endpoint from EvalHub JobSpec.""" + + def __init__( + self, + base_url: str, + model_id: str, + *, + max_tokens: int | None = None, + temperature: float | None = None, + run_config: RunConfig | None = None, + ): + if run_config is None: + run_config = RunConfig() + super().__init__(run_config, multiple_completion_supported=True) + self._base_url = base_url.rstrip("/") + self._model_id = model_id + self._max_tokens = max_tokens + self._temperature = temperature + + def _client(self) -> Any: + if not _HAS_OPENAI: + raise RuntimeError( + "OpenAI package is required. Install with: pip install llama-stack-provider-ragas[evalhub]" + ) + return OpenAI( + base_url=f"{self._base_url}/v1" if not self._base_url.endswith("/v1") else self._base_url, + api_key=_get_api_key(), + ) + + def generate_text( + self, + prompt: PromptValue, + n: int = 1, + temperature: float | None = None, + stop: list[str] | None = None, + callbacks: Any = None, + ) -> LLMResult: + client = self._client() + kwargs = { + "model": self._model_id, + "prompt": prompt.to_string(), + "n": n, + } + if self._max_tokens is not None: + kwargs["max_tokens"] = self._max_tokens + t = temperature if temperature is not None else self._temperature + if t is not None: + kwargs["temperature"] = t + if stop: + kwargs["stop"] = stop + + try: + response = client.completions.create(**kwargs) + except Exception as e: + logger.error("Completion request failed: %s", e) + raise + + generations = [] + for choice in getattr(response, "choices", []) or []: + text = getattr(choice, "text", "") or "" + generations.append(Generation(text=text)) + + if not generations: + generations = [Generation(text="")] + + return LLMResult(generations=[generations], llm_output={"provider": "evalhub_openai"}) + + def is_finished(self, response: LLMResult) -> bool: + """Check if the LLM response is finished. Completions API returns full response, so always True.""" + return True + + async def agenerate_text( + self, + prompt: PromptValue, + n: int = 1, + temperature: float | None = None, + stop: list[str] | None = None, + callbacks: Any = None, + ) -> LLMResult: + return self.generate_text(prompt, n=n, temperature=temperature, stop=stop, callbacks=callbacks) + + def get_temperature(self, n: int) -> float: + if self._temperature is not None: + return self._temperature + return 0.3 if n > 1 else 1e-8