diff --git a/lazyllm/tools/rag/__init__.py b/lazyllm/tools/rag/__init__.py index ec4489dab..fa1f59bab 100644 --- a/lazyllm/tools/rag/__init__.py +++ b/lazyllm/tools/rag/__init__.py @@ -11,7 +11,7 @@ CharacterSplitter, RecursiveSplitter, MarkdownSplitter, CodeSplitter, JSONSplitter, YAMLSplitter, HTMLSplitter, XMLSplitter, GeneralCodeSplitter, JSONLSplitter) from .similarity import register_similarity -from .doc_node import DocNode +from .doc_node import DocNode, RichDocNode from .readers import (PDFReader, DocxReader, HWPReader, PPTXReader, ImageReader, IPYNBReader, EpubReader, MarkdownReader, MboxReader, PandasCSVReader, PandasExcelReader, VideoAudioReader, MineruPDFReader) @@ -46,6 +46,7 @@ 'register_similarity', 'register_reranker', 'DocNode', + 'RichDocNode', 'PDFReader', 'DocxReader', 'HWPReader', diff --git a/lazyllm/tools/rag/doc_impl.py b/lazyllm/tools/rag/doc_impl.py index 2370329ec..0792a070f 100644 --- a/lazyllm/tools/rag/doc_impl.py +++ b/lazyllm/tools/rag/doc_impl.py @@ -192,11 +192,9 @@ def _lazy_init(self) -> None: self._dlm.update_kb_group(cond_file_ids=ids, cond_group=self._kb_group_name, new_status=DocListManager.Status.success) if self._dlm: - self._init_monitor_event = threading.Event() self._daemon = threading.Thread(target=self.worker) self._daemon.daemon = True self._daemon.start() - self._init_monitor_event.wait() def _resolve_index_pending_registrations(self): for index_type, index_cls, index_args, index_kwargs in self._index_pending_registrations: @@ -298,13 +296,11 @@ def add_reader(self, pattern: str, func: Optional[Callable] = None): self._local_file_reader[pattern] = func self._reader._lazy_init.flag.reset() - def _add_doc_to_store_with_status(self, input_files: List[str], ids: List[str], metadatas: List[Dict[str, Any]], - cond_status_list: Optional[List[str]] = None): + def _add_doc_to_store(self, input_files: List[str], ids: List[str], metadatas: List[Dict[str, Any]]): success_ids, failed_ids = [], [] for filepath, doc_id, metadata in zip(input_files, ids or repeat(None), metadatas or repeat(None)): try: - self._add_doc_to_store(input_files=[filepath], ids=[doc_id] if doc_id is not None else None, - metadatas=[metadata] if metadata is not None else None) + self._processor.add_doc([filepath], [doc_id], [metadata] if metadata is not None else None) success_ids.append(doc_id) except Exception as e: LOG.error(f'Error adding document {doc_id} ({filepath}) to store: {e}') @@ -312,10 +308,10 @@ def _add_doc_to_store_with_status(self, input_files: List[str], ids: List[str], if success_ids: self._dlm.update_kb_group(cond_file_ids=success_ids, cond_group=self._kb_group_name, - cond_status_list=cond_status_list, new_status=DocListManager.Status.success) + new_status=DocListManager.Status.success) if failed_ids: self._dlm.update_kb_group(cond_file_ids=failed_ids, cond_group=self._kb_group_name, - cond_status_list=cond_status_list, new_status=DocListManager.Status.failed) + new_status=DocListManager.Status.failed) def _batch_call(self, func: Callable, *args, batch_size: int = 10, **kwargs): batch_count = next((len(arg) for arg in args if isinstance(arg, (tuple, list))), 0) @@ -323,7 +319,6 @@ def _batch_call(self, func: Callable, *args, batch_size: int = 10, **kwargs): func(*[arg[i:i + batch_size] if isinstance(arg, (list, tuple)) else arg for arg in args], **kwargs) def worker(self): - is_first_run = True while True: # Apply meta changes rows = self._dlm.fetch_docs_changed_meta(self._kb_group_name) @@ -341,7 +336,7 @@ def worker(self): self._dlm.update_kb_group(cond_file_ids=ids, cond_group=self._kb_group_name, new_status=DocListManager.Status.working, new_need_reparse=False) self._delete_doc_from_store(doc_ids=ids) - self._batch_call(self._add_doc_to_store_with_status, filepaths, ids, metadatas, batch_size=10) + self._batch_call(self._add_doc_to_store, filepaths, ids, metadatas, batch_size=10) # Step 2: After doc is deleted from related kb_group, delete doc from db if self._kb_group_name == DocListManager.DEFAULT_GROUP_NAME: @@ -359,12 +354,8 @@ def worker(self): if files: self._dlm.update_kb_group(cond_file_ids=ids, cond_group=self._kb_group_name, new_status=DocListManager.Status.working) - self._batch_call(self._add_doc_to_store_with_status, - files, ids, metadatas, cond_status_list=[DocListManager.Status.working]) + self._batch_call(self._add_doc_to_store, files, ids, metadatas) - if is_first_run: - self._init_monitor_event.set() - is_first_run = False time.sleep(10) def _list_files( @@ -381,11 +372,6 @@ def _list_files( metadatas.append(json.loads(row[3]) if row[3] else {}) return ids, paths, metadatas - def _add_doc_to_store(self, input_files: List[str], ids: Optional[List[str]] = None, - metadatas: Optional[List[Dict[str, Any]]] = None): - if not input_files: return - self._processor.add_doc(input_files, ids, metadatas) - def _delete_doc_from_store(self, doc_ids: List[str] = None) -> None: self._processor.delete_doc(doc_ids=doc_ids) diff --git a/lazyllm/tools/rag/doc_to_db/extractor.py b/lazyllm/tools/rag/doc_to_db/extractor.py index 089692250..8f672d486 100644 --- a/lazyllm/tools/rag/doc_to_db/extractor.py +++ b/lazyllm/tools/rag/doc_to_db/extractor.py @@ -13,7 +13,7 @@ from lazyllm import LOG, ThreadPoolExecutor, once_wrapper from lazyllm.components import JsonFormatter -from lazyllm.module import LLMBase +from lazyllm.module import LLMBase, ModuleBase from ...sql.sql_manager import DBStatus, SqlManager from ..doc_node import DocNode @@ -33,7 +33,7 @@ ONE_DOC_LENGTH_LIMIT = 102400 -class SchemaExtractor: +class SchemaExtractor(ModuleBase): '''Schema aware extractor that materializes BaseModel schemas into database tables.''' TABLE_PREFIX = 'lazyllm_schema' @@ -724,8 +724,8 @@ def _get_extract_data(self, algo_id: str, doc_ids: List[str], # noqa: C901 results.append(ExtractResult(data=row_data, metadata=meta)) return results - def __call__(self, data: Union[str, List[DocNode]], - algo_id: str = DocListManager.DEFAULT_GROUP_NAME) -> ExtractResult: + def forward(self, data: Union[str, List[DocNode]], + algo_id: str = DocListManager.DEFAULT_GROUP_NAME) -> ExtractResult: # NOTE: data should be from single file source (kb_id, doc_id should be the same) self._lazy_init() res = self.extract_and_store(data=data, algo_id=algo_id) diff --git a/lazyllm/tools/rag/document.py b/lazyllm/tools/rag/document.py index 0b395e050..bdd304bfe 100644 --- a/lazyllm/tools/rag/document.py +++ b/lazyllm/tools/rag/document.py @@ -58,6 +58,7 @@ def __init__(self, dataset_path: Optional[str], embed: Optional[Union[Callable, self._dataset_path = dataset_path self._embed = self._get_embeds(embed) self._processor = processor + self._schema_extractor = self._register_submodules(schema_extractor) name = name or DocListManager.DEFAULT_GROUP_NAME if not display_name: display_name = name @@ -90,21 +91,22 @@ def web_url(self): def _get_embeds(self, embed): embeds = embed if isinstance(embed, dict) else {EMBED_DEFAULT_KEY: embed} if embed else {} - for embed in embeds.values(): - if isinstance(embed, ModuleBase): - self._submodules.append(embed) - return embeds - - def add_kb_group(self, name, doc_fields: Optional[Dict[str, DocField]] = None, - store_conf: Optional[Dict] = None, - embed: Optional[Union[Callable, Dict[str, Callable]]] = None): + return self._register_submodules(embeds) + + def _register_submodules(self, m): + if not m: return m + for embed in (m.values() if isinstance(m, dict) else m if isinstance(m, (tuple, list)) else [m]): + if isinstance(embed, ModuleBase): self._submodules.append(embed) + return m + + def add_kb_group(self, name, doc_fields: Optional[Dict[str, DocField]] = None, store_conf: Optional[Dict] = None, + embed: Optional[Union[Callable, Dict[str, Callable]]] = None, + schema_extractor: Optional[Union[LLMBase, SchemaExtractor]] = None): embed = self._get_embeds(embed) if embed else self._embed - if isinstance(self._kbs, ServerModule): - self._kbs._impl._m[name] = DocImpl(dlm=self._dlm, embed=embed, kb_group_name=name, - global_metadata_desc=doc_fields, store=store_conf) - else: - self._kbs[name] = DocImpl(dlm=self._dlm, embed=self._embed, kb_group_name=name, - global_metadata_desc=doc_fields, store=store_conf) + schema_extractor = self._register_submodules(schema_extractor) or self._schema_extractor + impl = DocImpl(dlm=self._dlm, embed=embed, kb_group_name=name, global_metadata_desc=doc_fields, + store=store_conf, schema_extractor=schema_extractor) + (self._kbs._impl._m if isinstance(self._kbs, ServerModule) else self._kbs)[name] = impl self._dlm.add_kb_group(name=name) def get_doc_by_kb_group(self, name): @@ -151,7 +153,6 @@ def __init__(self, dataset_path: Optional[str] = None, embed: Optional[Union[Cal 'Only map store is supported for Document with temp-files') name = name or DocListManager.DEFAULT_GROUP_NAME - self._schema_extractor: SchemaExtractor = schema_extractor if isinstance(manager, Document._Manager): assert not server, 'Server infomation is already set to by manager' @@ -161,7 +162,8 @@ def __init__(self, dataset_path: Optional[str] = None, embed: Optional[Union[Cal if dataset_path != manager._dataset_path and dataset_path != manager._origin_path: raise RuntimeError(f'Document path mismatch, expected `{manager._dataset_path}`' f'while received `{dataset_path}`') - manager.add_kb_group(name=name, doc_fields=doc_fields, store_conf=store_conf, embed=embed) + manager.add_kb_group(name=name, doc_fields=doc_fields, store_conf=store_conf, embed=embed, + schema_extractor=schema_extractor) self._manager = manager self._curr_group = name else: @@ -177,7 +179,7 @@ def __init__(self, dataset_path: Optional[str] = None, embed: Optional[Union[Cal self._manager = Document._Manager(dataset_path, embed, manager, server, name, launcher, store_conf, doc_fields, cloud=cloud, doc_files=doc_files, processor=processor, display_name=display_name, description=description, - schema_extractor=self._schema_extractor) + schema_extractor=schema_extractor) self._curr_group = name self._doc_to_db_processor: DocToDbProcessor = None self._graph_document: weakref.ref = None diff --git a/lazyllm/tools/rag/parsing_service/impl.py b/lazyllm/tools/rag/parsing_service/impl.py index 9b9c7b13a..b6dd4c0d8 100644 --- a/lazyllm/tools/rag/parsing_service/impl.py +++ b/lazyllm/tools/rag/parsing_service/impl.py @@ -5,6 +5,7 @@ from collections import defaultdict, deque from concurrent.futures import ThreadPoolExecutor from functools import cached_property +from itertools import repeat from lazyllm import LOG @@ -95,12 +96,9 @@ def add_doc(self, input_files: List[str], ids: Optional[List[str]] = None, # no try: if not input_files: return if not ids: ids = [gen_docid(path) for path in input_files] - if metadatas is None: - metadatas = [{} for _ in input_files] - for metadata, doc_id, path in zip(metadatas, ids, input_files): - metadata.setdefault(RAG_DOC_ID, doc_id) - metadata.setdefault(RAG_DOC_PATH, path) - metadata.setdefault(RAG_KB_ID, kb_id or DEFAULT_KB_ID) + temp_metas = [{RAG_DOC_ID: doc_id, RAG_DOC_PATH: path, RAG_KB_ID: kb_id or DEFAULT_KB_ID} + for doc_id, path in zip(ids, input_files)] + metadatas = [{**temp, **(metadata)} for metadata, temp in zip(metadatas or repeat({}), temp_metas)] kb_id = metadatas[0].get(RAG_KB_ID, DEFAULT_KB_ID) if kb_id is None else kb_id root_nodes = self._reader.load_data(input_files, metadatas, split_nodes_by_type=True) schema_futures = [] diff --git a/tests/basic_tests/RAG/test_document.py b/tests/basic_tests/RAG/test_document.py index 649b74d68..5db8f2319 100644 --- a/tests/basic_tests/RAG/test_document.py +++ b/tests/basic_tests/RAG/test_document.py @@ -85,7 +85,7 @@ def test_add_files(self): new_doc = DocNode(text='new dummy text', group=LAZY_ROOT_NAME) new_doc._global_metadata = {RAG_DOC_ID: gen_docid(self.tmp_file_b.name), RAG_DOC_PATH: self.tmp_file_b.name} self.mock_directory_reader.load_data.return_value = {LAZY_ROOT_NAME: [new_doc], LAZY_IMAGE_GROUP: []} - self.doc_impl._add_doc_to_store([self.tmp_file_b.name]) + self.doc_impl._processor._add_doc([self.tmp_file_b.name]) assert len(self.doc_impl.store.get_nodes(group=LAZY_ROOT_NAME)) == 2 class TestDocument(unittest.TestCase):