diff --git a/nemo_retriever/src/nemo_retriever/html/ray_data.py b/nemo_retriever/src/nemo_retriever/html/ray_data.py index f2dafbd80..1c87a18b7 100644 --- a/nemo_retriever/src/nemo_retriever/html/ray_data.py +++ b/nemo_retriever/src/nemo_retriever/html/ray_data.py @@ -35,12 +35,14 @@ def __call__(self, batch_df: pd.DataFrame) -> pd.DataFrame: out_dfs: List[pd.DataFrame] = [] for _, row in batch_df.iterrows(): raw = row.get("bytes") + text = row.get("text") path = row.get("path") - if raw is None or path is None: + if (raw is None and text is None) or path is None: continue path_str = str(path) if path is not None else "" try: - chunk_df = html_bytes_to_chunks_df(raw, path_str, params=params) + payload = raw or text.encode("utf-8") + chunk_df = html_bytes_to_chunks_df(payload, path_str, params=params) if not chunk_df.empty: out_dfs.append(chunk_df) except Exception: diff --git a/nemo_retriever/src/nemo_retriever/ingest_modes/inprocess.py b/nemo_retriever/src/nemo_retriever/ingest_modes/inprocess.py index 34eaf7ed5..90e230cba 100644 --- a/nemo_retriever/src/nemo_retriever/ingest_modes/inprocess.py +++ b/nemo_retriever/src/nemo_retriever/ingest_modes/inprocess.py @@ -1001,7 +1001,12 @@ def extract(self, params: ExtractParams | None = None, **kwargs: Any) -> "InProc # NOTE: `kwargs` passed to `.extract()` are intended primarily for PDF extraction # (e.g. `extract_text`, `dpi`, etc). Downstream model stages do NOT necessarily # accept the same keyword arguments. Keep per-stage kwargs isolated. - + if self._input_documents and all(f.lower().endswith(".txt") for f in self._input_documents): + txt_params = TextChunkParams() + return self.extract_txt(params=txt_params) + if self._input_documents and all(f.lower().endswith(".html") for f in self._input_documents): + html_params = HtmlChunkParams() + return self.extract_html(params=html_params) resolved = _coerce_params(params, ExtractParams, kwargs) if ( any( @@ -1289,9 +1294,13 @@ def extract_txt(self, params: TextChunkParams | None = None, **kwargs: Any) -> " Use with .files("*.txt").extract_txt(...).embed().vdb_upload().ingest(). Do not call .extract() when using .extract_txt(). """ + from nemo_retriever.txt.ray_data import TxtSplitActor + self._pipeline_type = "txt" resolved = _coerce_params(params, TextChunkParams, kwargs) self._extract_txt_kwargs = resolved.model_dump(mode="python") + text_split = TxtSplitActor(params=TextChunkParams(**self._extract_txt_kwargs)) + self._tasks.append((text_split, {})) return self def extract_html(self, params: HtmlChunkParams | None = None, **kwargs: Any) -> "InProcessIngestor": @@ -1301,9 +1310,15 @@ def extract_html(self, params: HtmlChunkParams | None = None, **kwargs: Any) -> Use with .files("*.html").extract_html(...).embed().vdb_upload().ingest(). Do not call .extract() when using .extract_html(). """ + from nemo_retriever.html.ray_data import HtmlSplitActor + self._pipeline_type = "html" resolved = _coerce_params(params, HtmlChunkParams, kwargs) self._extract_html_kwargs = resolved.model_dump(mode="python") + html_split = HtmlSplitActor( + params=HtmlChunkParams(**self._extract_html_kwargs), + ) + self._tasks.append((html_split, {})) return self def extract_audio( diff --git a/nemo_retriever/src/nemo_retriever/txt/ray_data.py b/nemo_retriever/src/nemo_retriever/txt/ray_data.py index f01191814..b74f482cd 100644 --- a/nemo_retriever/src/nemo_retriever/txt/ray_data.py +++ b/nemo_retriever/src/nemo_retriever/txt/ray_data.py @@ -58,12 +58,14 @@ def __call__(self, batch_df: pd.DataFrame) -> pd.DataFrame: out_dfs: List[pd.DataFrame] = [] for _, row in batch_df.iterrows(): raw = row.get("bytes") + text = row.get("text") path = row.get("path") - if raw is None or path is None: + if (raw is None and text is None) or path is None: continue path_str = str(path) if path is not None else "" try: - chunk_df = txt_bytes_to_chunks_df(raw, path_str, params=params) + payload = raw or text.encode("utf-8") + chunk_df = txt_bytes_to_chunks_df(payload, path_str, params=params) if not chunk_df.empty: out_dfs.append(chunk_df) except Exception: