Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nemo_retriever/src/nemo_retriever/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

from nemo_retriever.ingestor import Ingestor, RunMode, create_ingestor

__all__ = ["Ingestor", "RunMode", "create_ingestor"]
__all__ = ["RunMode", "create_ingestor"]
20 changes: 19 additions & 1 deletion nemo_retriever/src/nemo_retriever/graph/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from nemo_retriever.utils.ray_resource_hueristics import (
gather_cluster_resources,
gather_local_resources,
NEMOTRON_PARSE_BATCH_SIZE,
NEMOTRON_PARSE_GPUS_PER_ACTOR,
OCR_GPUS_PER_ACTOR,
)

Expand Down Expand Up @@ -256,6 +258,15 @@ def ingest(self, data: Any, **kwargs: Any) -> Any:
batch_format = overrides.pop("batch_format", self._default_batch_format)
num_cpus = overrides.pop("num_cpus", self._default_num_cpus)

# NemotronParseGPUActor uses vLLM which handles its own batching
# efficiently, so feed it more rows per map_batches call.
from nemo_retriever.parse.nemotron_parse import NemotronParseActor, NemotronParseGPUActor

if batch_size == self._default_batch_size and issubclass(
node.operator_class, (NemotronParseActor, NemotronParseGPUActor)
):
batch_size = NEMOTRON_PARSE_BATCH_SIZE

# When no explicit num_gpus override is given, auto-detect from the
# GPUOperator mixin using actual cluster GPU availability.
if "num_gpus" in overrides:
Expand All @@ -279,7 +290,14 @@ def ingest(self, data: Any, **kwargs: Any) -> Any:
elif available_gpus > 0:
# Local model, GPUs present: assign the heuristic fraction so
# Ray can co-schedule multiple actors per GPU.
num_gpus = max(self._default_num_gpus, _DEFAULT_GPU_OPERATOR_NUM_GPUS)
# Exception: NemotronParseGPUActor uses vLLM which manages
# its own KV-cache and requires exclusive GPU access.
from nemo_retriever.parse.nemotron_parse import NemotronParseActor, NemotronParseGPUActor

if issubclass(node.operator_class, (NemotronParseActor, NemotronParseGPUActor)):
num_gpus = max(self._default_num_gpus, NEMOTRON_PARSE_GPUS_PER_ACTOR)
else:
num_gpus = max(self._default_num_gpus, _DEFAULT_GPU_OPERATOR_NUM_GPUS)
else:
# No GPUs in the cluster — operator will likely fail to load
# its CUDA model. Warn loudly rather than silently requesting
Expand Down
1 change: 1 addition & 0 deletions nemo_retriever/src/nemo_retriever/graph/gpu_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ class MyGPUActor(AbstractOperator, GPUOperator):

Executors can inspect ``isinstance(op, GPUOperator)`` to allocate
GPU resources or route work to GPU-capable workers.

"""
11 changes: 10 additions & 1 deletion nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
explode_content_to_rows,
)
from nemo_retriever.graph.multi_type_extract_operator import MultiTypeExtractOperator
from nemo_retriever.ocr.ocr import NemotronParseActor, OCRActor
from nemo_retriever.ocr.ocr import OCRActor
from nemo_retriever.parse.nemotron_parse import NemotronParseActor
from nemo_retriever.page_elements.page_elements import PageElementDetectionActor
from nemo_retriever.pdf.extract import PDFExtractionActor
from nemo_retriever.pdf.split import PDFSplitActor
Expand Down Expand Up @@ -265,14 +266,22 @@ def build_graph(
}

if parse_mode:
# PDF extraction renders pages to images required by Nemotron Parse.
extract_kwargs["extract_page_as_image"] = True
graph = graph >> PDFExtractionActor(**extract_kwargs)

parse_kwargs: dict[str, Any] = {
"extract_text": extract_params.extract_text,
"extract_tables": extract_params.extract_tables,
"extract_charts": extract_params.extract_charts,
"extract_infographics": extract_params.extract_infographics,
}
if extract_params.invoke_url:
parse_kwargs["invoke_url"] = extract_params.invoke_url
if extract_params.api_key:
parse_kwargs["api_key"] = extract_params.api_key
if extract_params.nemotron_parse_model:
parse_kwargs["nemotron_parse_model"] = extract_params.nemotron_parse_model
graph = graph >> NemotronParseActor(**parse_kwargs)
else:
detect_kwargs: dict[str, Any] = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from nemo_retriever.html.ray_data import HtmlSplitActor
from nemo_retriever.image.ray_data import ImageLoadActor
from nemo_retriever.image.load import SUPPORTED_IMAGE_EXTENSIONS
from nemo_retriever.ocr.ocr import NemotronParseActor, OCRActor
from nemo_retriever.ocr.ocr import OCRActor
from nemo_retriever.parse.nemotron_parse import NemotronParseActor
from nemo_retriever.page_elements.page_elements import PageElementDetectionActor
from nemo_retriever.params import ASRParams
from nemo_retriever.params import AudioChunkParams
Expand Down
196 changes: 128 additions & 68 deletions nemo_retriever/src/nemo_retriever/model/local/nemotron_parse_v1_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from __future__ import annotations

from pathlib import Path
from typing import Any, Optional, Union
from typing import Any, List, Optional, Sequence, Union

import numpy as np
import torch
Expand All @@ -15,65 +15,118 @@
from nemo_retriever.utils.hf_model_registry import get_hf_revision
from ..model import BaseModel, RunMode

# Type alias for all supported single-image input formats.
ImageInput = Union[torch.Tensor, np.ndarray, Image.Image, str, Path]


# ---------------------------------------------------------------------------
# vLLM processor bug workaround
# ---------------------------------------------------------------------------
# vLLM's bundled NemotronParseProcessor.__call__ passes add_special_tokens=False
# explicitly to the tokenizer AND also forwards it via **kwargs from the vLLM
# pipeline, causing a duplicate keyword argument TypeError. We monkey-patch
# the processor class at import time to pop the conflicting kwarg.

_VLLM_PROCESSOR_PATCHED = False


def _patch_vllm_nemotron_parse_processor() -> None:
"""Fix duplicate-kwarg bug in vLLM's NemotronParseProcessor.__call__."""
global _VLLM_PROCESSOR_PATCHED
if _VLLM_PROCESSOR_PATCHED:
return

try:
from vllm.model_executor.models.nemotron_parse import NemotronParseProcessor
except ImportError:
return

_orig_call = NemotronParseProcessor.__call__

def _fixed_call(self, text=None, images=None, return_tensors=None, **kwargs):
kwargs.pop("add_special_tokens", None)
return _orig_call(self, text=text, images=images, return_tensors=return_tensors, **kwargs)

NemotronParseProcessor.__call__ = _fixed_call
_VLLM_PROCESSOR_PATCHED = True


# ---------------------------------------------------------------------------
# Model wrapper
# ---------------------------------------------------------------------------


class NemotronParseV12(BaseModel):
"""
NVIDIA Nemotron Parse v1.2 local wrapper.
NVIDIA Nemotron Parse v1.2 local wrapper backed by vLLM.

This wrapper loads `nvidia/NVIDIA-Nemotron-Parse-v1.2` from Hugging Face and
runs image-to-structured-text generation for document parsing.
This wrapper loads ``nvidia/NVIDIA-Nemotron-Parse-v1.2`` via vLLM's offline
``LLM`` engine for image-to-structured-text generation (document parsing).
vLLM handles KV-cache management, continuous batching, and GPU scheduling
internally, avoiding the transformers cache-API incompatibility that affects
the HuggingFace ``trust_remote_code`` model code with transformers >= 4.52.
"""

_DEFAULT_TASK_PROMPT: str = "</s><s><predict_bbox><predict_classes><output_markdown><predict_no_text_in_pic>"

def __init__(
self,
model_path: str = "nvidia/NVIDIA-Nemotron-Parse-v1.2",
device: Optional[str] = None,
hf_cache_dir: Optional[str] = None,
task_prompt: str = "</s><s><predict_bbox><predict_classes><output_markdown><predict_no_text_in_pic>",
task_prompt: str = _DEFAULT_TASK_PROMPT,
gpu_memory_utilization: float = 0.8,
max_num_seqs: int = 64,
max_tokens: int = 9000,
) -> None:
super().__init__()

from transformers import AutoModel, AutoProcessor, AutoTokenizer, GenerationConfig
try:
from vllm import LLM, SamplingParams # noqa: F401
except ImportError as e:
raise ImportError(
"Local Nemotron Parse requires vLLM. " 'Install with: pip install "nemo-retriever[vllm]"'
) from e

_patch_vllm_nemotron_parse_processor()

self._model_path = model_path
self._task_prompt = task_prompt
self._device = torch.device(device or ("cuda:0" if torch.cuda.is_available() else "cpu"))
self._dtype = torch.bfloat16 if self._device.type == "cuda" else torch.float32
hf_cache_dir = configure_global_hf_cache_base(hf_cache_dir)
_revision = get_hf_revision(self._model_path)

self._model = AutoModel.from_pretrained(
self._model_path,
revision=_revision,
trust_remote_code=True,
torch_dtype=self._dtype,
cache_dir=hf_cache_dir,
).to(self._device)
self._model.eval()

self._tokenizer = AutoTokenizer.from_pretrained(
self._model_path,
revision=_revision,
cache_dir=hf_cache_dir,
trust_remote_code=True,
)
self._processor = AutoProcessor.from_pretrained(
self._model_path,
revision=_revision,
self._max_tokens = max_tokens

if device is not None:
import os

dev_id = device.split(":")[-1] if ":" in device else device
os.environ["CUDA_VISIBLE_DEVICES"] = dev_id

configure_global_hf_cache_base(hf_cache_dir)
revision = get_hf_revision(model_path)

self._llm = LLM(
model=model_path,
revision=revision,
trust_remote_code=True,
cache_dir=hf_cache_dir,
dtype="bfloat16",
max_num_seqs=max_num_seqs,
limit_mm_per_prompt={"image": 1},
gpu_memory_utilization=gpu_memory_utilization,
)
self._generation_config = GenerationConfig.from_pretrained(
self._model_path,
revision=_revision,
trust_remote_code=True,
cache_dir=hf_cache_dir,

self._sampling_params = SamplingParams(
temperature=0,
top_k=1,
repetition_penalty=1.1,
max_tokens=self._max_tokens,
skip_special_tokens=False,
)

def preprocess(self, input_data: Union[torch.Tensor, np.ndarray, Image.Image, str, Path]) -> Image.Image:
"""
Normalize supported input formats to a RGB PIL image.
"""
# ------------------------------------------------------------------
# Input normalisation
# ------------------------------------------------------------------

def preprocess(self, input_data: ImageInput) -> Image.Image:
"""Normalize supported input formats to an RGB PIL image."""
if isinstance(input_data, Image.Image):
return input_data.convert("RGB")

Expand Down Expand Up @@ -116,57 +169,68 @@ def preprocess(self, input_data: Union[torch.Tensor, np.ndarray, Image.Image, st

raise TypeError(f"Unsupported input type for Nemotron Parse: {type(input_data)!r}")

# ------------------------------------------------------------------
# Inference
# ------------------------------------------------------------------

def invoke(
self,
input_data: Union[torch.Tensor, np.ndarray, Image.Image, str, Path],
input_data: ImageInput,
task_prompt: Optional[str] = None,
) -> str:
"""
Run local Nemotron Parse inference and return decoded model text.
"""
image = self.preprocess(input_data)
prompt = task_prompt or self._task_prompt
"""Run Nemotron Parse on a single image and return the decoded text."""
return self.invoke_batch([input_data], task_prompt=task_prompt)[0]

inputs = self._processor(
images=[image],
text=prompt,
return_tensors="pt",
add_special_tokens=False,
).to(self._device)

with torch.inference_mode():
outputs = self._model.generate(**inputs, generation_config=self._generation_config)
def invoke_batch(
self,
inputs: Sequence[ImageInput],
task_prompt: Optional[str] = None,
) -> List[str]:
"""Run Nemotron Parse on a batch of images via vLLM.

decoded = self._processor.batch_decode(outputs, skip_special_tokens=True)
return decoded[0] if decoded else ""
vLLM handles continuous batching and GPU scheduling internally,
making this significantly faster than sequential single-image calls
for large batches.
"""
prompt = task_prompt or self._task_prompt
prompts = [
{
"encoder_prompt": {
"prompt": "",
"multi_modal_data": {"image": self.preprocess(img)},
},
"decoder_prompt": prompt,
}
for img in inputs
]
outputs = self._llm.generate(prompts, self._sampling_params)
return [out.outputs[0].text.strip() for out in outputs]

def __call__(
self,
input_data: Union[torch.Tensor, np.ndarray, Image.Image, str, Path],
input_data: ImageInput,
task_prompt: Optional[str] = None,
) -> str:
return self.invoke(input_data, task_prompt=task_prompt)

# ------------------------------------------------------------------
# BaseModel abstract interface
# ------------------------------------------------------------------

@property
def model_name(self) -> str:
"""Human-readable model name."""
return "NVIDIA-Nemotron-Parse-v1.2"

@property
def model_type(self) -> str:
"""Model category/type."""
return "document-parse"

@property
def model_runmode(self) -> RunMode:
"""Execution mode: local, NIM, or build-endpoint."""
return "local"

@property
def input(self) -> Any:
"""
Input schema for the model.
"""
return {
"type": "image",
"format": "RGB",
Expand All @@ -176,9 +240,6 @@ def input(self) -> Any:

@property
def output(self) -> Any:
"""
Output schema for the model.
"""
return {
"type": "text",
"format": "string",
Expand All @@ -187,5 +248,4 @@ def output(self) -> Any:

@property
def input_batch_size(self) -> int:
"""Maximum or default input batch size."""
return 1
return 64
Loading
Loading