Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions tools/harness/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,37 @@ uv run nv-ingest-harness-run --case=e2e --dataset=bo767,earnings,bo20

# Custom path still works (uses active section config)
uv run nv-ingest-harness-run --case=e2e --dataset=/custom/path

# List available datasets and groups
uv run nv-ingest-harness-run --list-datasets
```

#### Dataset Groups

Run multiple related datasets with a single command using dataset groups:

```yaml
# In test_configs.yaml
dataset_groups:
vidore: # All 8 Vidore V3 benchmark datasets
- vidore_v3_finance_en
- vidore_v3_industrial
- ...
vidore_quick: # Quick test with smallest datasets
- vidore_v3_hr
- vidore_v3_industrial
```

**Usage:**
```bash
# Run all Vidore datasets
uv run nv-ingest-harness-run --case=e2e_recall --dataset=vidore

# Run quick test (smallest 2 datasets)
uv run nv-ingest-harness-run --case=e2e_recall --dataset=vidore_quick

# Mix groups and individual datasets
uv run nv-ingest-harness-run --case=e2e --dataset=vidore_quick,bo20
```

**Dataset Extraction Settings:**
Expand Down
2 changes: 2 additions & 0 deletions tools/harness/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"beir>=2.0.0",
"click>=8.1.8",
"datasets>=2.0.0",
"docker>=7.1.0",
"pyyaml>=6.0",
"requests>=2.31.0",
Expand Down
54 changes: 44 additions & 10 deletions tools/harness/src/nv_ingest_harness/cases/e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ def main(config=None, log_path: str = "test_results") -> int:
extract_charts = config.extract_charts
extract_images = config.extract_images
extract_infographics = config.extract_infographics
extract_page_as_image = config.extract_page_as_image
extract_method = config.extract_method
text_depth = config.text_depth
table_output_format = config.table_output_format
image_elements_modality = config.image_elements_modality

# Optional pipeline steps
enable_caption = config.enable_caption
Expand All @@ -80,6 +83,11 @@ def main(config=None, log_path: str = "test_results") -> int:

model_name, dense_dim = embed_info()

# Deployment fingerprint - detect silent fallback to wrong model
if dense_dim == 1024:
print("WARNING: Embedding model returned dim=1024 (nv-embedqa-e5-v5 fallback)")
print("WARNING: Expected dim=2048 for multimodal embed. Check embedding NIM status.")

# Log configuration for transparency
print("=== Test Configuration ===")
print(f"Dataset: {data_dir}")
Expand Down Expand Up @@ -155,15 +163,20 @@ def main(config=None, log_path: str = "test_results") -> int:
ingestor = ingestor.pdf_split_config(pages_per_chunk=pdf_split_page_count)

# Extraction step
ingestor = ingestor.extract(
extract_text=extract_text,
extract_tables=extract_tables,
extract_charts=extract_charts,
extract_images=extract_images,
text_depth=text_depth,
table_output_format=table_output_format,
extract_infographics=extract_infographics,
)
extract_kwargs = {
"extract_text": extract_text,
"extract_tables": extract_tables,
"extract_charts": extract_charts,
"extract_images": extract_images,
"text_depth": text_depth,
"table_output_format": table_output_format,
"extract_infographics": extract_infographics,
}
if extract_page_as_image:
extract_kwargs["extract_page_as_image"] = True
if extract_method:
extract_kwargs["extract_method"] = extract_method
ingestor = ingestor.extract(**extract_kwargs)

# Optional pipeline steps
if enable_caption:
Expand All @@ -181,7 +194,10 @@ def main(config=None, log_path: str = "test_results") -> int:
)

# Embed (must come before storage per pipeline ordering)
ingestor = ingestor.embed(model_name=model_name)
embed_kwargs = {"model_name": model_name}
if image_elements_modality:
embed_kwargs["image_elements_modality"] = image_elements_modality
ingestor = ingestor.embed(**embed_kwargs)

# Store images to disk (server-side image storage) - optional
# Note: Supports both MinIO (s3://) and local disk (file://) via storage_uri
Expand Down Expand Up @@ -245,6 +261,24 @@ def main(config=None, log_path: str = "test_results") -> int:
# Optional: log chunk stats and per-type breakdown
if vdb_backend != "lancedb":
milvus_chunks(f"http://{hostname}:19530", collection_name)
# Verify collection vector dimension matches expected
try:
from pymilvus import MilvusClient

mc = MilvusClient(uri=f"http://{hostname}:19530")
col_info = mc.describe_collection(collection_name)
for field in col_info.get("fields", []):
params = field.get("params", {})
if "dim" in params:
actual_dim = int(params["dim"])
if actual_dim != dense_dim:
print(f"WARNING: Collection vector dim={actual_dim} != expected dim={dense_dim}")
print("WARNING: Collection may have been created with a different embedding model")
else:
print(f"Collection vector dim={actual_dim} matches expected dim={dense_dim}")
mc.close()
except Exception as e:
print(f"Could not verify collection schema: {e}")
text_results, table_results, chart_results = segment_results(results)
kv_event_log("text_chunks", sum(len(x) for x in text_results), log_path)
kv_event_log("table_chunks", sum(len(x) for x in table_results), log_path)
Expand Down
67 changes: 58 additions & 9 deletions tools/harness/src/nv_ingest_harness/cases/recall.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def evaluate_recall_with_reranker(
evaluation_params: Dict,
use_reranker: bool,
log_path: str = "test_results",
) -> Tuple[Dict[int, float], float]:
) -> Tuple[Dict, float]:
"""
Run recall evaluation with specified reranker setting.

Expand All @@ -30,32 +30,51 @@ def evaluate_recall_with_reranker(
log_path: Path for logging output

Returns:
Tuple of (scores_dict, elapsed_time)
Tuple of (results_dict, elapsed_time)
results_dict may be {k: score} or {"recall": {...}, "beir": {...}} if BEIR enabled
"""
mode_str = "with reranker" if use_reranker else "without reranker"
print("\n" + "=" * 60)
print(f"Running Recall Evaluation ({mode_str})")
print("=" * 60)

eval_start = time.time()
scores = evaluator(
results = evaluator(
collection_name=collection_name,
nv_ranker=use_reranker,
**evaluation_params,
)
eval_time = time.time() - eval_start

# Log results
# Handle both old format {k: score} and new format {"recall": {...}, "beir": {...}}
if isinstance(results, dict) and "recall" in results:
recall_scores = results["recall"]
beir_metrics = results.get("beir")
else:
recall_scores = results
beir_metrics = None

# Log recall results
reranker_suffix = "with" if use_reranker else "no"
print(f"\nMultimodal Recall ({mode_str}):")
for k in sorted(scores.keys()):
score = scores[k]
for k in sorted(recall_scores.keys()):
score = recall_scores[k]
print(f" - Recall @{k}: {score:.3f}")
reranker_suffix = "with" if use_reranker else "no"
kv_event_log(f"recall_multimodal_@{k}_{reranker_suffix}_reranker", score, log_path)

kv_event_log(f"recall_eval_time_s_{'with' if use_reranker else 'no'}_reranker", eval_time, log_path)
# Log BEIR metrics if available
if beir_metrics:
print(f"\nBEIR Metrics ({mode_str}):")
for metric_name, values in beir_metrics.items():
for k_str, score in values.items():
print(f" - {k_str}: {score:.5f}")
# Log with format: ndcg_10_no_reranker
k_num = k_str.split("@")[1] if "@" in k_str else k_str
kv_event_log(f"{metric_name}_{k_num}_{reranker_suffix}_reranker", score, log_path)

kv_event_log(f"recall_eval_time_s_{reranker_suffix}_reranker", eval_time, log_path)

return scores, eval_time
return results, eval_time


def main(config=None, log_path: str = "test_results") -> int:
Expand All @@ -69,6 +88,11 @@ def main(config=None, log_path: str = "test_results") -> int:
gpu_search = config.gpu_search
model_name, dense_dim = embed_info()

# Deployment fingerprint - detect silent fallback to wrong model
if dense_dim == 1024:
print("WARNING: Embedding model returned dim=1024 (nv-embedqa-e5-v5 fallback)")
print("WARNING: Expected dim=2048 for multimodal embed. Check embedding NIM status.")

# Recall-specific configuration with defaults
reranker_mode = getattr(config, "reranker_mode", "none")
recall_top_k = getattr(config, "recall_top_k", 10)
Expand Down Expand Up @@ -126,6 +150,27 @@ def main(config=None, log_path: str = "test_results") -> int:
if lancedb_path:
print(f"Using LanceDB at: {lancedb_path}")

# Verify collection schema if using Milvus
if vdb_backend == "milvus":
try:
from pymilvus import MilvusClient

verify_uri = f"http://{hostname}:19530"
mc = MilvusClient(uri=verify_uri)
col_info = mc.describe_collection(collection_name)
for field in col_info.get("fields", []):
params = field.get("params", {})
if "dim" in params:
actual_dim = int(params["dim"])
if actual_dim != dense_dim:
print(f"WARNING: Collection vector dim={actual_dim} != embed model dim={dense_dim}")
print("WARNING: Collection may have been created with a different embedding model")
else:
print(f"Collection vector dim={actual_dim} matches embed model dim={dense_dim}")
mc.close()
except Exception as e:
print(f"Could not verify collection schema: {e}")

try:
recall_results = {}

Expand All @@ -141,7 +186,11 @@ def main(config=None, log_path: str = "test_results") -> int:
"vdb_backend": vdb_backend,
"nv_ranker_endpoint": f"http://{hostname}:8020/v1/ranking",
"nv_ranker_model_name": "nvidia/llama-3.2-nv-rerankqa-1b-v2",
"enable_beir": config.enable_beir,
}
language_filter = getattr(config, "language_filter", None)
if language_filter and recall_dataset.startswith("vidore_"):
evaluation_params["language_filter"] = language_filter
if vdb_backend == "lancedb":
evaluation_params["table_path"] = lancedb_path

Expand Down
5 changes: 4 additions & 1 deletion tools/harness/src/nv_ingest_harness/cli/nightly.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import subprocess
import sys
import time
from pathlib import Path
from typing import Any

Expand Down Expand Up @@ -348,7 +349,9 @@ def main(
service_manager.stop()
return 1

print("Services ready!")
# Warm-up: let services stabilize and connect before running tests
print("Services ready! Sleeping 60s for warm-up...")
time.sleep(60)

all_results = []

Expand Down
63 changes: 58 additions & 5 deletions tools/harness/src/nv_ingest_harness/cli/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
import sys
import time
from pathlib import Path

import click
Expand Down Expand Up @@ -69,6 +70,10 @@ def run_datasets(
service_manager.stop()
return 1

# Warm-up: let services stabilize and connect before running tests
print("Services ready! Sleeping 60s for warm-up...")
time.sleep(60)

# Run each dataset
for dataset_name in dataset_list:
print(f"\n{'='*60}")
Expand Down Expand Up @@ -279,14 +284,17 @@ def __init__(self, file_path, original_stream):

def write(self, data):
self.original.write(data)
self.file.write(data)
if not self.file.closed:
self.file.write(data)

def flush(self):
self.original.flush()
self.file.flush()
if not self.file.closed:
self.file.flush()

def close(self):
self.file.close()
if not self.file.closed:
self.file.close()

tee_stdout = TeeFile(stdout_path, sys.stdout)
old_stdout = sys.stdout
Expand Down Expand Up @@ -369,6 +377,11 @@ def close(self):
default=None,
help="Path to test config YAML (default: tools/harness/test_configs.yaml)",
)
@click.option(
"--list-datasets",
is_flag=True,
help="List available datasets and groups, then exit",
)
def main(
case,
managed,
Expand All @@ -382,14 +395,54 @@ def main(
sku,
dump_logs,
test_config_path,
list_datasets,
):
# Handle --list-datasets
if list_datasets:
from nv_ingest_harness.config import list_datasets as get_datasets

config_file = test_config_path or str(Path(__file__).resolve().parents[3] / "test_configs.yaml")
info = get_datasets(config_file=config_file)

print("Available Datasets:")
print("-" * 50)
for name, config in sorted(info["datasets"].items()):
if isinstance(config, dict):
path = config.get("path", "N/A")
recall = config.get("recall_dataset")
recall_str = f" [recall: {recall}]" if recall else ""
else:
path = config
recall_str = ""
print(f" {name}: {path}{recall_str}")

if info.get("groups"):
print("\nDataset Groups:")
print("-" * 50)
for name, members in sorted(info["groups"].items()):
print(f" {name} ({len(members)} datasets):")
for m in members:
print(f" - {m}")

return 0

if not dataset:
print("Error: --dataset is required. Use --dataset=<name> or --dataset=<name1>,<name2>", file=sys.stderr)
print(" Use --list-datasets to see available datasets and groups", file=sys.stderr)
return 1

# Parse dataset(s) - handle both single and comma-separated
dataset_list = [d.strip() for d in dataset.split(",") if d.strip()]
# Parse dataset(s) - handle single, comma-separated, and groups
import yaml

config_path = (
Path(test_config_path) if test_config_path else Path(__file__).resolve().parents[3] / "test_configs.yaml"
)
with open(config_path) as f:
yaml_data = yaml.safe_load(f)

from nv_ingest_harness.config import expand_dataset_names

dataset_list = expand_dataset_names(yaml_data, dataset)
if not dataset_list:
print("Error: No valid datasets found", file=sys.stderr)
return 1
Expand Down
Loading
Loading