Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/apps/canvas_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async def run():
if cvs.canvas_category == CanvasCategory.DataFlow:
task_id = get_uuid()
Pipeline(cvs.dsl, tenant_id=current_user.id, doc_id=CANVAS_DEBUG_DOC_ID, task_id=task_id, flow_id=req["id"])
ok, error_message = await asyncio.to_thread(queue_dataflow, user_id, req["id"], task_id, files[0], 0)
ok, error_message = await asyncio.to_thread(queue_dataflow, user_id, req["id"], task_id, CANVAS_DEBUG_DOC_ID, files[0], 0)
if not ok:
return get_data_error_result(message=error_message)
return get_json_result(data={"message_id": task_id})
Expand Down
1 change: 1 addition & 0 deletions api/apps/chunk_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ async def _retrieval():
LLMBundle(kb.tenant_id, LLMType.CHAT))
if ck["content_with_weight"]:
ranks["chunks"].insert(0, ck)
ranks["chunks"] = settings.retriever.retrieval_by_children(ranks["chunks"], tenant_ids)

for c in ranks["chunks"]:
c.pop("vector", None)
Expand Down
65 changes: 64 additions & 1 deletion common/metadata_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any, Callable
import logging
from typing import Any, Callable, Dict

import json_repair

from rag.prompts.generator import gen_meta_filter

Expand Down Expand Up @@ -140,3 +143,63 @@ async def apply_meta_data_filter(
doc_ids = ["-999"]

return doc_ids


def update_metadata_to(metadata, meta):
if not meta:
return metadata
if isinstance(meta, str):
try:
meta = json_repair.loads(meta)
except Exception:
logging.error("Meta data format error.")
return metadata
if not isinstance(meta, dict):
return metadata
for k, v in meta.items():
if isinstance(v, list):
v = [vv for vv in v if isinstance(vv, str)]
if not v:
continue
if not isinstance(v, list) and not isinstance(v, str):
continue
if k not in metadata:
metadata[k] = v
continue
if isinstance(metadata[k], list):
if isinstance(v, list):
metadata[k].extend(v)
else:
metadata[k].append(v)
else:
metadata[k] = v

return metadata


def metadata_schema(metadata: list|None) -> Dict[str, Any]:
if not metadata:
return {}
properties = {}

for item in metadata:
key = item.get("key")
if not key:
continue

prop_schema = {
"description": item.get("description", "")
}
if "enum" in item and item["enum"]:
prop_schema["enum"] = item["enum"]
prop_schema["type"] = "string"

properties[key] = prop_schema

json_schema = {
"type": "object",
"properties": properties,
}

json_schema["additionalProperties"] = False
return json_schema
2 changes: 1 addition & 1 deletion graphrag/general/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def _chat(self, system, history, gen_conf={}, task_id=""):
raise TaskCanceledException(f"Task {task_id} was cancelled")

try:
response = self._llm.chat(system_msg[0]["content"], hist, conf)
response = asyncio.run(self._llm.async_chat(system_msg[0]["content"], hist, conf))
response = re.sub(r"^.*</think>", "", response, flags=re.DOTALL)
if response.find("**ERROR**") >= 0:
raise Exception(response)
Expand Down
11 changes: 8 additions & 3 deletions rag/app/naive.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,9 +635,14 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca
"parser_config", {
"chunk_token_num": 512, "delimiter": "\n!?。;!?", "layout_recognize": "DeepDOC", "analyze_hyperlink": True})

child_deli = re.findall(r"`([^`]+)`", parser_config.get("children_delimiter", ""))
child_deli = sorted(set(child_deli), key=lambda x: -len(x))
child_deli = "|".join(re.escape(t) for t in child_deli if t)
child_deli = parser_config.get("children_delimiter", "").encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8')
cust_child_deli = re.findall(r"`([^`]+)`", child_deli)
child_deli = "|".join(re.sub(r"`([^`]+)`", "", child_deli))
if cust_child_deli:
cust_child_deli = sorted(set(cust_child_deli), key=lambda x: -len(x))
cust_child_deli = "|".join(re.escape(t) for t in cust_child_deli if t)
child_deli += cust_child_deli

is_markdown = False
table_context_size = max(0, int(parser_config.get("table_context_size", 0) or 0))
image_context_size = max(0, int(parser_config.get("image_context_size", 0) or 0))
Expand Down
9 changes: 1 addition & 8 deletions rag/flow/splitter/splitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,7 @@ async def _invoke(self, **kwargs):
deli += f"`{d}`"
else:
deli += d
child_deli = ""
for d in self._param.children_delimiters:
if len(d) > 1:
child_deli += f"`{d}`"
else:
child_deli += d
child_deli = [m.group(1) for m in re.finditer(r"`([^`]+)`", child_deli)]
custom_pattern = "|".join(re.escape(t) for t in sorted(set(child_deli), key=len, reverse=True))
custom_pattern = "|".join(re.escape(t) for t in sorted(set(self._param.children_delimiters), key=len, reverse=True))

self.set_output("output_format", "chunks")
self.callback(random.randint(1, 5) / 100.0, "Start to split into chunks.")
Expand Down
25 changes: 17 additions & 8 deletions rag/nlp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,21 @@ def tokenize(d, txt, eng):
d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"])


def split_with_pattern(d, pattern:str, content:str, eng) -> list:
docs = []
txts = [txt for txt in re.split(r"(%s)" % pattern, content, flags=re.DOTALL)]
for j in range(0, len(txts), 2):
txt = txts[j]
if not txt:
continue
if j + 1 < len(txts):
txt += txts[j+1]
dd = copy.deepcopy(d)
tokenize(dd, txt, eng)
docs.append(dd)
return docs


def tokenize_chunks(chunks, doc, eng, pdf_parser=None, child_delimiters_pattern=None):
res = []
# wrap up as es documents
Expand All @@ -293,10 +308,7 @@ def tokenize_chunks(chunks, doc, eng, pdf_parser=None, child_delimiters_pattern=

if child_delimiters_pattern:
d["mom_with_weight"] = ck
for txt in re.split(r"(%s)" % child_delimiters_pattern, ck, flags=re.DOTALL):
dd = copy.deepcopy(d)
tokenize(dd, txt, eng)
res.append(dd)
res.extend(split_with_pattern(d, child_delimiters_pattern, ck, eng))
continue

tokenize(d, ck, eng)
Expand All @@ -316,10 +328,7 @@ def tokenize_chunks_with_images(chunks, doc, eng, images, child_delimiters_patte
add_positions(d, [[ii]*5])
if child_delimiters_pattern:
d["mom_with_weight"] = ck
for txt in re.split(r"(%s)" % child_delimiters_pattern, ck, flags=re.DOTALL):
dd = copy.deepcopy(d)
tokenize(dd, txt, eng)
res.append(dd)
res.extend(split_with_pattern(d, child_delimiters_pattern, ck, eng))
continue
tokenize(d, ck, eng)
res.append(d)
Expand Down
10 changes: 10 additions & 0 deletions rag/prompts/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,3 +821,13 @@ async def relevant_chunks_with_toc(query: str, toc:list[dict], chat_mdl, topn: i
except Exception as e:
logging.exception(e)
return []


META_DATA = load_prompt("meta_data")
async def gen_metadata(chat_mdl, schema:dict, content:str):
template = PROMPT_JINJA_ENV.from_string(META_DATA)
system_prompt = template.render(content=content, schema=schema)
user_prompt = "Output: "
_, msg = message_fit_in(form_message(system_prompt, user_prompt), chat_mdl.max_length)
ans = await chat_mdl.async_chat(msg[0]["content"], msg[1:])
return re.sub(r"^.*</think>", "", ans, flags=re.DOTALL)
13 changes: 13 additions & 0 deletions rag/prompts/meta_data.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Extract important structured information from the given content.
Output ONLY a valid JSON string with no additional text.
If no important structured information is found, output an empty JSON object: {}.

Important structured information structure as following:

{{ schema }}

---------------------------
The given content as following:

{{ content }}

79 changes: 44 additions & 35 deletions rag/svr/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@
import threading
import time

import json_repair

from api.db import PIPELINE_SPECIAL_PROGRESS_FREEZE_TASK_TYPES
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.services.pipeline_operation_log_service import PipelineOperationLogService
from common.connection_utils import timeout
from common.metadata_utils import update_metadata_to, metadata_schema
from rag.utils.base64_image import image2id
from rag.utils.raptor_utils import should_skip_raptor, get_skip_reason
from common.log_utils import init_root_logger
from common.config_utils import show_configs
from graphrag.general.index import run_graphrag_for_kb
from graphrag.utils import get_llm_cache, set_llm_cache, get_tags_from_cache, set_tags_to_cache
from rag.prompts.generator import keyword_extraction, question_proposal, content_tagging, run_toc_from_text
from rag.prompts.generator import keyword_extraction, question_proposal, content_tagging, run_toc_from_text, \
gen_metadata
import logging
import os
from datetime import datetime
Expand Down Expand Up @@ -369,6 +369,45 @@ async def doc_question_proposal(chat_mdl, d, topn):
raise
progress_callback(msg="Question generation {} chunks completed in {:.2f}s".format(len(docs), timer() - st))

if task["parser_config"].get("enable_metadata", False) and task["parser_config"].get("metadata"):
st = timer()
progress_callback(msg="Start to generate meta-data for every chunk ...")
chat_mdl = LLMBundle(task["tenant_id"], LLMType.CHAT, llm_name=task["llm_id"], lang=task["language"])

async def gen_metadata_task(chat_mdl, d):
cached = get_llm_cache(chat_mdl.llm_name, d["content_with_weight"], "metadata")
if not cached:
async with chat_limiter:
cached = await gen_metadata(chat_mdl,
metadata_schema(task["parser_config"]["metadata"]),
d["content_with_weight"])
set_llm_cache(chat_mdl.llm_name, d["content_with_weight"], cached, "metadata")
if cached:
d["metadata_obj"] = cached
tasks = []
for d in docs:
tasks.append(asyncio.create_task(gen_metadata_task(chat_mdl, d)))
try:
await asyncio.gather(*tasks, return_exceptions=False)
except Exception as e:
logging.error("Error in doc_question_proposal", exc_info=e)
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
raise
metadata = {}
for ck in cks:
metadata = update_metadata_to(metadata, ck["metadata_obj"])
del ck["metadata_obj"]
if metadata:
e, doc = DocumentService.get_by_id(task["doc_id"])
if e:
if isinstance(doc.meta_fields, str):
doc.meta_fields = json.loads(doc.meta_fields)
metadata = update_metadata_to(metadata, doc.meta_fields)
DocumentService.update_by_id(task["doc_id"], {"meta_fields": metadata})
progress_callback(msg="Question generation {} chunks completed in {:.2f}s".format(len(docs), timer() - st))

if task["kb_parser_config"].get("tag_kb_ids", []):
progress_callback(msg="Start to tag for every chunk ...")
kb_ids = task["kb_parser_config"]["tag_kb_ids"]
Expand Down Expand Up @@ -603,36 +642,6 @@ def batch_encode(txts):


metadata = {}
def dict_update(meta):
nonlocal metadata
if not meta:
return
if isinstance(meta, str):
try:
meta = json_repair.loads(meta)
except Exception:
logging.error("Meta data format error.")
return
if not isinstance(meta, dict):
return
for k, v in meta.items():
if isinstance(v, list):
v = [vv for vv in v if isinstance(vv, str)]
if not v:
continue
if not isinstance(v, list) and not isinstance(v, str):
continue
if k not in metadata:
metadata[k] = v
continue
if isinstance(metadata[k], list):
if isinstance(v, list):
metadata[k].extend(v)
else:
metadata[k].append(v)
else:
metadata[k] = v

for ck in chunks:
ck["doc_id"] = doc_id
ck["kb_id"] = [str(task["kb_id"])]
Expand All @@ -657,7 +666,7 @@ def dict_update(meta):
ck["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(ck["content_ltks"])
del ck["summary"]
if "metadata" in ck:
dict_update(ck["metadata"])
metadata = update_metadata_to(metadata, ck["metadata"])
del ck["metadata"]
if "content_with_weight" not in ck:
ck["content_with_weight"] = ck["text"]
Expand All @@ -671,7 +680,7 @@ def dict_update(meta):
if e:
if isinstance(doc.meta_fields, str):
doc.meta_fields = json.loads(doc.meta_fields)
dict_update(doc.meta_fields)
metadata = update_metadata_to(metadata, doc.meta_fields)
DocumentService.update_by_id(doc_id, {"meta_fields": metadata})

start_ts = timer()
Expand Down