Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ repos:
language: system
pass_filenames: false
always_run: true
args: [-c, 'KUBEFLOW_BASE_IMAGE=dummy uv run pytest -v -m "not lls_integration and not kfp_integration" --tb=short --maxfail=3; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret']
args: [-c, 'KUBEFLOW_BASE_IMAGE=dummy uv run pytest -v -m "unit or lls_integration" --tb=short --maxfail=3; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret']
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ addopts = "-v"
log_cli = true
log_cli_level = "INFO"
markers = [
"unit: Unit tests for wrapper classes (mocked client by default)",
"lls_integration: Llama Stack integration tests",
"kfp_integration: Kubeflow Pipelines integration tests",
"e2e: End-to-end tests against a deployed Llama Stack distribution on OpenShift",
]

[tool.ruff]
Expand Down
60 changes: 60 additions & 0 deletions tests/TESTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Testing

All test files live under `tests/`. Shared evaluation logic (smoke checks, eval job polling) is factored into `base_eval_tests.py`, which is not collected by pytest directly.

## Unit tests (`test_remote_wrappers.py`, pytest marker `unit`)

Tests the LangChain-compatible wrapper classes (`LlamaStackRemoteLLM` and `LlamaStackRemoteEmbeddings`) that the remote provider uses for inference. By default, the `LlamaStackClient` is mocked — no running server is required.

```bash
uv run pytest tests/test_remote_wrappers.py
```

Pass `--no-mock-client` to use a real `LlamaStackClient` against a running Llama Stack server (defaults to `http://localhost:8321`). Model IDs can be overridden with `INFERENCE_MODEL` and `EMBEDDING_MODEL`.

```bash
uv run pytest tests/test_remote_wrappers.py --no-mock-client
```

## Integration tests (`test_inline_evaluation.py`, pytest marker `lls_integration`)

Tests the eval providers through an in-process Llama Stack server using `LlamaStackAsLibraryClient`. The stack configuration (providers, models, storage) is built entirely in fixtures. By default, Ollama connectivity and inference are mocked.

```bash
uv run pytest tests/test_inline_evaluation.py
```

Pass `--no-mock-inference` to use a real Ollama instance for inference:

```bash
INFERENCE_MODEL=ollama/granite3.3:2b \
EMBEDDING_MODEL=ollama/all-minilm:latest \
uv run pytest tests/test_inline_evaluation.py --no-mock-inference
```

## End-to-end tests (`test_e2e.py`, pytest marker `e2e`)

Tests against a fully deployed Llama Stack distribution on an OpenShift cluster. Requires the cluster environment from `cluster-deployment/` to be set up and a port-forward to the Llama Stack service:

```bash
oc port-forward -n ragas-test svc/lsd-ragas-test-service 8321:8321
uv run pytest tests/test_e2e.py
```

These tests exercise both the inline and remote eval providers through the Llama Stack eval API, including dataset registration, benchmark creation, and eval job execution with result polling.

## Model configuration

Each test module defines its own `inference_model` and `embedding_model` fixtures with defaults appropriate to its backend:

| Module | Inference default | Embedding default | Backend |
|--------|-------------------|-------------------|---------|
| `test_inline_evaluation.py` | `ollama/granite3.3:2b` | `ollama/all-minilm:latest` | In-process Ollama (library client) |
| `test_remote_wrappers.py` | `litellm/Mistral-Small-24B-W8A8` | `nomic-ai/nomic-embed-text-v1.5` | Mocked `LlamaStackClient` |
| `test_e2e.py` | `Mistral-Small-24B-W8A8` | `nomic-ai/nomic-embed-text-v1.5` | OpenShift cluster (see `cluster-deployment/manifests/configmap-and-secrets.yaml`) |

The `INFERENCE_MODEL` and `EMBEDDING_MODEL` environment variables override these defaults across all suites. When overriding, ensure the values match the models registered in the target environment — e.g. e2e defaults must match the OpenShift configmap, and inline defaults must use the `ollama/` prefix expected by the library client config.

## Cluster deployment (`cluster-deployment/`)

Contains the Containerfile, deployment/teardown scripts, and Kubernetes manifests needed to stand up the e2e test environment on OpenShift. See `cluster-deployment/deploy-e2e.sh` to deploy.
141 changes: 141 additions & 0 deletions tests/base_eval_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""Shared test helpers for Llama Stack eval provider tests.

Provides ``SmokeTester`` and ``EvalTester``, plain helper classes that
encapsulate common assertions (model/dataset/benchmark registration) and
eval-job execution logic (run, poll, verify scores). Test modules
instantiate them via fixtures, supplying the appropriate client and
configuration for each environment (in-process library client or remote
``LlamaStackClient``).
"""

import time

from rich import print as pprint


class SmokeTester:
def __init__(self, client, dataset_id, inline_benchmark_id, remote_benchmark_id):
self.client = client
self.dataset_id = dataset_id
self.inline_benchmark_id = inline_benchmark_id
self.remote_benchmark_id = remote_benchmark_id

def test_providers_registered(self):
providers = self.client.providers.list()
assert len(providers) > 0
assert any(p.api == "eval" for p in providers)
pprint("Providers:", providers)
Comment on lines +23 to +27
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider calling test_providers_registered from at least one smoke test to ensure eval providers are discoverable

SmokeTester defines test_providers_registered, but current smoke tests (test_inline_evaluation.py, test_e2e.py) only call the models/datasets/benchmarks checks. This means a misconfigured providers list (e.g., eval API not exposed despite benchmarks) would slip through. Please call smoke_tester.test_providers_registered() in at least one smoke test to cover provider registration end-to-end.

Suggested implementation:

    def test_datasets_registered(self):
        datasets = self.client.beta.datasets.list()
        pprint("Datasets:", datasets)
        dataset_ids = [d.identifier for d in datasets]
        assert self.dataset_id in dataset_ids, (
            f"Dataset '{self.dataset_id}' not found. Available: {dataset_ids}"
        )

    def run_smoke_tests(self):
        """
        Run the minimal set of smoke tests to ensure the eval stack is discoverable.

        This intentionally includes provider registration so that a misconfigured
        providers list (e.g., eval API not exposed despite benchmarks) is caught
        by at least one end-to-end smoke test.
        """
        self.test_providers_registered()
        self.test_models_registered()
        self.test_datasets_registered()

To fully implement the suggestion (calling test_providers_registered from at least one smoke test), you should also:

  1. Update one of the smoke tests (e.g. tests/test_inline_evaluation.py or tests/test_e2e.py) to call run_smoke_tests() instead of individually calling test_models_registered() / test_datasets_registered(). For example, if a test currently does:
    smoke_tester.test_models_registered()
    smoke_tester.test_datasets_registered()
    change it to:
    smoke_tester.run_smoke_tests()
  2. Alternatively, in at least one smoke test, add an explicit call:
    smoke_tester.test_providers_registered()
    if you prefer not to use run_smoke_tests() there.
    These changes ensure that provider registration is exercised in at least one end-to-end smoke test, as requested in your review comment.


def test_models_registered(self):
models = self.client.models.list()
pprint("Models:", models)
assert len(models) > 0, "No models registered"

def test_datasets_registered(self):
datasets = self.client.beta.datasets.list()
pprint("Datasets:", datasets)
dataset_ids = [d.identifier for d in datasets]
assert self.dataset_id in dataset_ids, (
f"Dataset '{self.dataset_id}' not found. Available: {dataset_ids}"
)

def test_benchmarks_registered(self):
benchmarks = self.client.alpha.benchmarks.list()
pprint("Benchmarks:", benchmarks)
benchmark_ids = [b.identifier for b in benchmarks]
assert self.inline_benchmark_id in benchmark_ids, (
f"Benchmark '{self.inline_benchmark_id}' not found. Available: {benchmark_ids}"
)
assert self.remote_benchmark_id in benchmark_ids, (
f"Benchmark '{self.remote_benchmark_id}' not found. Available: {benchmark_ids}"
)


class EvalTester:
"""Base evaluation test class."""

def __init__(
self,
client,
inference_model,
dataset_id,
inline_benchmark_id,
remote_benchmark_id,
poll_interval: int = 5,
poll_timeout: int = 300,
):
self.client = client
self.inference_model = inference_model
self.dataset_id = dataset_id
self.inline_benchmark_id = inline_benchmark_id
self.remote_benchmark_id = remote_benchmark_id
self.poll_interval = poll_interval
self.poll_timeout = poll_timeout

def run_eval(
self,
benchmark_id: str,
inference_model: str,
num_examples: int | None = None,
):
"""Run an evaluation job and verify it completes with scores."""
benchmark_config = self._build_benchmark_config(
inference_model, num_examples=num_examples
)
job = self.client.alpha.eval.run_eval(
benchmark_id=benchmark_id,
benchmark_config=benchmark_config,
)
assert job.job_id is not None
assert job.status == "in_progress"

completed = self._wait_for_job(self.client, benchmark_id, job.job_id)
assert completed.status == "completed", (
f"Job finished with status '{completed.status}'"
)

results = self.client.alpha.eval.jobs.retrieve(
benchmark_id=benchmark_id, job_id=job.job_id
)
pprint(f"[{self.__class__.__name__}] Results:", results)
assert results.scores, "Expected non-empty scores"

# -- helpers --------------------------------------------------------

def _build_benchmark_config(
self, inference_model: str, num_examples: int | None = None
) -> dict:
"""Build the ``benchmark_config`` dict for ``run_eval``."""
config: dict = {
"eval_candidate": {
"type": "model",
"model": inference_model,
"sampling_params": {
"temperature": 0.1,
"max_tokens": 100,
},
},
"scoring_params": {},
}
if num_examples is not None:
config["num_examples"] = num_examples
return config

def _wait_for_job(
self, client, benchmark_id: str, job_id: str, timeout: int | None = None
):
"""Poll until the eval job reaches a terminal state."""
timeout = timeout if timeout is not None else self.poll_timeout
deadline = time.time() + timeout
while time.time() < deadline:
job = client.alpha.eval.jobs.status(
benchmark_id=benchmark_id, job_id=job_id
)
Comment on lines +124 to +133
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add a focused unit test for _wait_for_job covering the timeout path

Currently the timeout path that raises TimeoutError isn’t covered. Please add a unit-style test (e.g., with a fake client whose jobs.status always returns in_progress and a very small poll_timeout) to verify that _wait_for_job times out and raises as expected instead of hanging, especially since this helper is reused across library and E2E tests.

Suggested implementation:

    def test_wait_for_job_times_out(self):
        """Ensure _wait_for_job raises TimeoutError when the job never completes."""

        class _InProgressJob:
            def __init__(self):
                self.status = "in_progress"

        class _FakeJobs:
            def status(self, benchmark_id: str, job_id: str):
                # Always return a job that is still in progress to force a timeout
                return _InProgressJob()

        class _FakeEval:
            def __init__(self):
                self.jobs = _FakeJobs()

        class _FakeAlpha:
            def __init__(self):
                self.eval = _FakeEval()

        class _FakeClient:
            def __init__(self):
                self.alpha = _FakeAlpha()

        fake_client = _FakeClient()

        # Use a very small timeout so the test runs quickly while exercising the timeout path
        with pytest.raises(TimeoutError):
            self._wait_for_job(
                client=fake_client,
                benchmark_id="benchmark-id",
                job_id="job-id",
                timeout=0.01,
            )


    def test_benchmarks_registered(self):
  1. Ensure pytest is imported at the top of tests/base_eval_tests.py, e.g.:
    import pytest
  2. If TimeoutError is shadowed or wrapped elsewhere in the codebase, adjust the assertion to match the specific exception type used by _wait_for_job.

pprint(f"[{self.__class__.__name__}] Job status:", job)
if job.status in ("completed", "failed"):
return job
time.sleep(self.poll_interval)
raise TimeoutError(
f"Job {job_id} for benchmark {benchmark_id} "
f"did not complete within {timeout}s"
)
Loading
Loading