2323import threading
2424import time
2525
26- import json_repair
27-
2826from api .db import PIPELINE_SPECIAL_PROGRESS_FREEZE_TASK_TYPES
2927from api .db .services .knowledgebase_service import KnowledgebaseService
3028from api .db .services .pipeline_operation_log_service import PipelineOperationLogService
3129from common .connection_utils import timeout
30+ from common .metadata_utils import update_metadata_to , metadata_schema
3231from rag .utils .base64_image import image2id
3332from rag .utils .raptor_utils import should_skip_raptor , get_skip_reason
3433from common .log_utils import init_root_logger
3534from common .config_utils import show_configs
3635from graphrag .general .index import run_graphrag_for_kb
3736from graphrag .utils import get_llm_cache , set_llm_cache , get_tags_from_cache , set_tags_to_cache
38- from rag .prompts .generator import keyword_extraction , question_proposal , content_tagging , run_toc_from_text
37+ from rag .prompts .generator import keyword_extraction , question_proposal , content_tagging , run_toc_from_text , \
38+ gen_metadata
3939import logging
4040import os
4141from datetime import datetime
@@ -368,6 +368,45 @@ async def doc_question_proposal(chat_mdl, d, topn):
368368 raise
369369 progress_callback (msg = "Question generation {} chunks completed in {:.2f}s" .format (len (docs ), timer () - st ))
370370
371+ if task ["parser_config" ].get ("enable_metadata" , False ) and task ["parser_config" ].get ("metadata" ):
372+ st = timer ()
373+ progress_callback (msg = "Start to generate meta-data for every chunk ..." )
374+ chat_mdl = LLMBundle (task ["tenant_id" ], LLMType .CHAT , llm_name = task ["llm_id" ], lang = task ["language" ])
375+
376+ async def gen_metadata_task (chat_mdl , d ):
377+ cached = get_llm_cache (chat_mdl .llm_name , d ["content_with_weight" ], "metadata" )
378+ if not cached :
379+ async with chat_limiter :
380+ cached = await gen_metadata (chat_mdl ,
381+ metadata_schema (task ["parser_config" ]["metadata" ]),
382+ d ["content_with_weight" ])
383+ set_llm_cache (chat_mdl .llm_name , d ["content_with_weight" ], cached , "metadata" )
384+ if cached :
385+ d ["metadata_obj" ] = cached
386+ tasks = []
387+ for d in docs :
388+ tasks .append (asyncio .create_task (gen_metadata_task (chat_mdl , d )))
389+ try :
390+ await asyncio .gather (* tasks , return_exceptions = False )
391+ except Exception as e :
392+ logging .error ("Error in doc_question_proposal" , exc_info = e )
393+ for t in tasks :
394+ t .cancel ()
395+ await asyncio .gather (* tasks , return_exceptions = True )
396+ raise
397+ metadata = {}
398+ for ck in cks :
399+ metadata = update_metadata_to (metadata , ck ["metadata_obj" ])
400+ del ck ["metadata_obj" ]
401+ if metadata :
402+ e , doc = DocumentService .get_by_id (task ["doc_id" ])
403+ if e :
404+ if isinstance (doc .meta_fields , str ):
405+ doc .meta_fields = json .loads (doc .meta_fields )
406+ metadata = update_metadata_to (metadata , doc .meta_fields )
407+ DocumentService .update_by_id (task ["doc_id" ], {"meta_fields" : metadata })
408+ progress_callback (msg = "Question generation {} chunks completed in {:.2f}s" .format (len (docs ), timer () - st ))
409+
371410 if task ["kb_parser_config" ].get ("tag_kb_ids" , []):
372411 progress_callback (msg = "Start to tag for every chunk ..." )
373412 kb_ids = task ["kb_parser_config" ]["tag_kb_ids" ]
@@ -602,36 +641,6 @@ def batch_encode(txts):
602641
603642
604643 metadata = {}
605- def dict_update (meta ):
606- nonlocal metadata
607- if not meta :
608- return
609- if isinstance (meta , str ):
610- try :
611- meta = json_repair .loads (meta )
612- except Exception :
613- logging .error ("Meta data format error." )
614- return
615- if not isinstance (meta , dict ):
616- return
617- for k , v in meta .items ():
618- if isinstance (v , list ):
619- v = [vv for vv in v if isinstance (vv , str )]
620- if not v :
621- continue
622- if not isinstance (v , list ) and not isinstance (v , str ):
623- continue
624- if k not in metadata :
625- metadata [k ] = v
626- continue
627- if isinstance (metadata [k ], list ):
628- if isinstance (v , list ):
629- metadata [k ].extend (v )
630- else :
631- metadata [k ].append (v )
632- else :
633- metadata [k ] = v
634-
635644 for ck in chunks :
636645 ck ["doc_id" ] = doc_id
637646 ck ["kb_id" ] = [str (task ["kb_id" ])]
@@ -656,7 +665,7 @@ def dict_update(meta):
656665 ck ["content_sm_ltks" ] = rag_tokenizer .fine_grained_tokenize (ck ["content_ltks" ])
657666 del ck ["summary" ]
658667 if "metadata" in ck :
659- dict_update ( ck ["metadata" ])
668+ metadata = update_metadata_to ( metadata , ck ["metadata" ])
660669 del ck ["metadata" ]
661670 if "content_with_weight" not in ck :
662671 ck ["content_with_weight" ] = ck ["text" ]
@@ -670,7 +679,7 @@ def dict_update(meta):
670679 if e :
671680 if isinstance (doc .meta_fields , str ):
672681 doc .meta_fields = json .loads (doc .meta_fields )
673- dict_update ( doc .meta_fields )
682+ metadata = update_metadata_to ( metadata , doc .meta_fields )
674683 DocumentService .update_by_id (doc_id , {"meta_fields" : metadata })
675684
676685 start_ts = timer ()
0 commit comments