Skip to content

Commit 57aec2e

Browse files
authored
Fix bug: run Knowledge graph or RAPTOR, it will update an existing task (infiniflow#14102)
### What problem does this PR solve? It fixed the bug: infiniflow#14101 When run Knowledge graph or RAPTOR, the last document running status will be wrongly set, see below: It should never touch existing document result. ![Image](https://github.com/user-attachments/assets/14fe1f9e-0541-4093-8111-ed0bd25b87ba) ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
1 parent 27ebc64 commit 57aec2e

3 files changed

Lines changed: 7 additions & 9 deletions

File tree

api/apps/kb_app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,7 @@ async def run_mindmap():
773773
sample_document = documents[0]
774774
document_ids = [document["id"] for document in documents]
775775

776-
task_id = queue_raptor_o_graphrag_tasks(sample_doc_id=sample_document, ty="mindmap", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
776+
task_id = queue_raptor_o_graphrag_tasks(sample_doc=sample_document, ty="mindmap", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
777777

778778
if not KnowledgebaseService.update_by_id(kb.id, {"mindmap_task_id": task_id}):
779779
logging.warning(f"Cannot save mindmap_task_id for kb {kb_id}")

api/apps/services/dataset_api_service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ def run_graphrag(dataset_id: str, tenant_id: str):
444444
sample_document = documents[0]
445445
document_ids = [document["id"] for document in documents]
446446

447-
task_id = queue_raptor_o_graphrag_tasks(sample_doc_id=sample_document, ty="graphrag", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
447+
task_id = queue_raptor_o_graphrag_tasks(sample_doc=sample_document, ty="graphrag", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
448448

449449
if not KnowledgebaseService.update_by_id(kb.id, {"graphrag_task_id": task_id}):
450450
logging.warning(f"Cannot save graphrag_task_id for Dataset {dataset_id}")
@@ -523,7 +523,7 @@ def run_raptor(dataset_id: str, tenant_id: str):
523523
sample_document = documents[0]
524524
document_ids = [document["id"] for document in documents]
525525

526-
task_id = queue_raptor_o_graphrag_tasks(sample_doc_id=sample_document, ty="raptor", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
526+
task_id = queue_raptor_o_graphrag_tasks(sample_doc=sample_document, ty="raptor", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
527527

528528
if not KnowledgebaseService.update_by_id(kb.id, {"raptor_task_id": task_id}):
529529
logging.warning(f"Cannot save raptor_task_id for Dataset {dataset_id}")

api/db/services/document_service.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -984,23 +984,22 @@ def run(cls, tenant_id: str, doc: dict, kb_table_num_map: dict):
984984
queue_tasks(doc, bucket, name, 0)
985985

986986

987-
def queue_raptor_o_graphrag_tasks(sample_doc_id, ty, priority, fake_doc_id="", doc_ids=[]):
987+
def queue_raptor_o_graphrag_tasks(sample_doc, ty, priority, fake_doc_id="", doc_ids=[]):
988988
"""
989989
You can provide a fake_doc_id to bypass the restriction of tasks at the knowledgebase level.
990990
Optionally, specify a list of doc_ids to determine which documents participate in the task.
991991
"""
992992
assert ty in ["graphrag", "raptor", "mindmap"], "type should be graphrag, raptor or mindmap"
993993

994-
chunking_config = DocumentService.get_chunking_config(sample_doc_id["id"])
994+
chunking_config = DocumentService.get_chunking_config(sample_doc["id"])
995995
hasher = xxhash.xxh64()
996996
for field in sorted(chunking_config.keys()):
997997
hasher.update(str(chunking_config[field]).encode("utf-8"))
998998

999999
def new_task():
1000-
nonlocal sample_doc_id
10011000
return {
10021001
"id": get_uuid(),
1003-
"doc_id": sample_doc_id["id"],
1002+
"doc_id": fake_doc_id,
10041003
"from_page": 100000000,
10051004
"to_page": 100000000,
10061005
"task_type": ty,
@@ -1015,9 +1014,8 @@ def new_task():
10151014
task["digest"] = hasher.hexdigest()
10161015
bulk_insert_into_db(Task, [task], True)
10171016

1018-
task["doc_id"] = fake_doc_id
10191017
task["doc_ids"] = doc_ids
1020-
DocumentService.begin2parse(sample_doc_id["id"], keep_progress=True)
1018+
DocumentService.begin2parse(task["doc_id"], keep_progress=True)
10211019
assert REDIS_CONN.queue_product(settings.get_svr_queue_name(priority), message=task), "Can't access Redis. Please check the Redis' status."
10221020
return task["id"]
10231021

0 commit comments

Comments
 (0)