Skip to content

Commit 34283d4

Browse files
authored
Feat: add data source to pipleline logs . (infiniflow#11075)
### What problem does this PR solve? infiniflow#10953 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
1 parent 5629fbd commit 34283d4

7 files changed

Lines changed: 45 additions & 45 deletions

File tree

api/apps/connector_app.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from flask_login import login_required, current_user
2020

2121
from api.db import InputType
22-
from api.db.services.connector_service import ConnectorService, Connector2KbService, SyncLogsService
22+
from api.db.services.connector_service import ConnectorService, SyncLogsService
2323
from api.utils.api_utils import get_json_result, validate_request, get_data_error_result
2424
from common.misc_utils import get_uuid
2525
from common.constants import RetCode, TaskStatus
@@ -88,14 +88,14 @@ def resume(connector_id):
8888
return get_json_result(data=True)
8989

9090

91-
@manager.route("/<connector_id>/link", methods=["POST"]) # noqa: F821
92-
@validate_request("kb_ids")
91+
@manager.route("/<connector_id>/rebuild", methods=["PUT"]) # noqa: F821
9392
@login_required
94-
def link_kb(connector_id):
93+
@validate_request("kb_id")
94+
def rebuild(connector_id):
9595
req = request.json
96-
errors = Connector2KbService.link_kb(connector_id, req["kb_ids"], current_user.id)
97-
if errors:
98-
return get_json_result(data=False, message=errors, code=RetCode.SERVER_ERROR)
96+
err = ConnectorService.rebuild(connector_id, req["kb_id"], current_user.id)
97+
if err:
98+
return get_json_result(data=False, message=err, code=RetCode.SERVER_ERROR)
9999
return get_json_result(data=True)
100100

101101

api/apps/document_app.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,8 @@ def list_docs():
260260
for doc_item in docs:
261261
if doc_item["thumbnail"] and not doc_item["thumbnail"].startswith(IMG_BASE64_PREFIX):
262262
doc_item["thumbnail"] = f"/v1/document/image/{kb_id}-{doc_item['thumbnail']}"
263+
if doc_item.get("source_type"):
264+
doc_item["source_type"] = doc_item["source_type"].split("/")[0]
263265

264266
return get_json_result(data={"total": tol, "docs": docs})
265267
except Exception as e:

api/db/db_models.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,6 +1064,7 @@ class Connector2Kb(DataBaseModel):
10641064
id = CharField(max_length=32, primary_key=True)
10651065
connector_id = CharField(max_length=32, null=False, index=True)
10661066
kb_id = CharField(max_length=32, null=False, index=True)
1067+
auto_parse = CharField(max_length=1, null=False, default="1", index=False)
10671068

10681069
class Meta:
10691070
db_table = "connector2kb"
@@ -1282,4 +1283,8 @@ def migrate_db():
12821283
migrate(migrator.add_column("tenant_llm", "status", CharField(max_length=1, null=False, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True)))
12831284
except Exception:
12841285
pass
1286+
try:
1287+
migrate(migrator.add_column("connector2kb", "auto_parse", CharField(max_length=1, null=False, default="1", index=False)))
1288+
except Exception:
1289+
pass
12851290
logging.disable(logging.NOTSET)

api/db/services/connector_service.py

Lines changed: 27 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ def resume(cls, connector_id, status):
5454
SyncLogsService.update_by_id(task["id"], task)
5555
ConnectorService.update_by_id(connector_id, {"status": status})
5656

57-
5857
@classmethod
5958
def list(cls, tenant_id):
6059
fields = [
@@ -67,6 +66,15 @@ def list(cls, tenant_id):
6766
cls.model.tenant_id == tenant_id
6867
).dicts())
6968

69+
@classmethod
70+
def rebuild(cls, kb_id:str, connector_id: str, tenant_id:str):
71+
e, conn = cls.get_by_id(connector_id)
72+
if not e:
73+
return
74+
SyncLogsService.filter_delete([SyncLogs.connector_id==connector_id, SyncLogs.kb_id==kb_id])
75+
docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}")
76+
return FileService.delete_docs([d.id for d in docs], tenant_id)
77+
7078

7179
class SyncLogsService(CommonService):
7280
model = SyncLogs
@@ -91,6 +99,7 @@ def list_sync_tasks(cls, connector_id=None, page_number=None, items_per_page=15)
9199
Connector.timeout_secs,
92100
Knowledgebase.name.alias("kb_name"),
93101
Knowledgebase.avatar.alias("kb_avatar"),
102+
Connector2Kb.auto_parse,
94103
cls.model.from_beginning.alias("reindex"),
95104
cls.model.status
96105
]
@@ -179,7 +188,7 @@ def increase_docs(cls, id, min_update, max_update, doc_num, err_msg="", error_co
179188
.where(cls.model.id == id).execute()
180189

181190
@classmethod
182-
def duplicate_and_parse(cls, kb, docs, tenant_id, src):
191+
def duplicate_and_parse(cls, kb, docs, tenant_id, src, auto_parse=True):
183192
if not docs:
184193
return None
185194

@@ -191,14 +200,17 @@ def read(self) -> bytes:
191200
return self.blob
192201

193202
errs = []
194-
files = [FileObj(filename=d["semantic_identifier"]+f".{d['extension']}", blob=d["blob"]) for d in docs]
203+
files = [FileObj(filename=d["semantic_identifier"]+(f"{d['extension']}" if d["semantic_identifier"][::-1].find(d['extension'][::-1])<0 else ""), blob=d["blob"]) for d in docs]
195204
doc_ids = []
196205
err, doc_blob_pairs = FileService.upload_document(kb, files, tenant_id, src)
197206
errs.extend(err)
207+
198208
kb_table_num_map = {}
199209
for doc, _ in doc_blob_pairs:
200-
DocumentService.run(tenant_id, doc, kb_table_num_map)
201210
doc_ids.append(doc["id"])
211+
if not auto_parse or auto_parse == "0":
212+
continue
213+
DocumentService.run(tenant_id, doc, kb_table_num_map)
202214

203215
return errs, doc_ids
204216

@@ -213,33 +225,6 @@ def get_latest_task(cls, connector_id, kb_id):
213225
class Connector2KbService(CommonService):
214226
model = Connector2Kb
215227

216-
@classmethod
217-
def link_kb(cls, conn_id:str, kb_ids: list[str], tenant_id:str):
218-
arr = cls.query(connector_id=conn_id)
219-
old_kb_ids = [a.kb_id for a in arr]
220-
for kb_id in kb_ids:
221-
if kb_id in old_kb_ids:
222-
continue
223-
cls.save(**{
224-
"id": get_uuid(),
225-
"connector_id": conn_id,
226-
"kb_id": kb_id
227-
})
228-
SyncLogsService.schedule(conn_id, kb_id, reindex=True)
229-
230-
errs = []
231-
e, conn = ConnectorService.get_by_id(conn_id)
232-
for kb_id in old_kb_ids:
233-
if kb_id in kb_ids:
234-
continue
235-
cls.filter_delete([cls.model.kb_id==kb_id, cls.model.connector_id==conn_id])
236-
SyncLogsService.filter_update([SyncLogs.connector_id==conn_id, SyncLogs.kb_id==kb_id, SyncLogs.status==TaskStatus.SCHEDULE], {"status": TaskStatus.CANCEL})
237-
docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}")
238-
err = FileService.delete_docs([d.id for d in docs], tenant_id)
239-
if err:
240-
errs.append(err)
241-
return "\n".join(errs)
242-
243228
@classmethod
244229
def link_connectors(cls, kb_id:str, connector_ids: list[str], tenant_id:str):
245230
arr = cls.query(kb_id=kb_id)
@@ -260,11 +245,15 @@ def link_connectors(cls, kb_id:str, connector_ids: list[str], tenant_id:str):
260245
continue
261246
cls.filter_delete([cls.model.kb_id==kb_id, cls.model.connector_id==conn_id])
262247
e, conn = ConnectorService.get_by_id(conn_id)
263-
SyncLogsService.filter_update([SyncLogs.connector_id==conn_id, SyncLogs.kb_id==kb_id, SyncLogs.status==TaskStatus.SCHEDULE], {"status": TaskStatus.CANCEL})
264-
docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}")
265-
err = FileService.delete_docs([d.id for d in docs], tenant_id)
266-
if err:
267-
errs.append(err)
248+
if not e:
249+
continue
250+
#SyncLogsService.filter_delete([SyncLogs.connector_id==conn_id, SyncLogs.kb_id==kb_id])
251+
# Do not delete docs while unlinking.
252+
SyncLogsService.filter_update([SyncLogs.connector_id==conn_id, SyncLogs.kb_id==kb_id, SyncLogs.status.in_([TaskStatus.SCHEDULE, TaskStatus.RUNNING])], {"status": TaskStatus.CANCEL})
253+
#docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}")
254+
#err = FileService.delete_docs([d.id for d in docs], tenant_id)
255+
#if err:
256+
# errs.append(err)
268257
return "\n".join(errs)
269258

270259
@classmethod
@@ -282,3 +271,5 @@ def list_connectors(cls, kb_id):
282271
).dicts()
283272
)
284273

274+
275+

api/db/services/pipeline_operation_log_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def create(cls, document_id, pipeline_id, task_type, fake_document_ids=[], dsl:
159159
document_name=document.name,
160160
document_suffix=document.suffix,
161161
document_type=document.type,
162-
source_from="", # TODO: add in the future
162+
source_from=document.source_type.split("/")[0],
163163
progress=document.progress,
164164
progress_msg=document.progress_msg,
165165
process_begin_at=document.process_begin_at,

common/data_source/notion_connector.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ def _read_pages(
253253
all_child_page_ids: list[str] = []
254254

255255
for page in pages:
256+
if isinstance(page, dict):
257+
page = NotionPage(**page)
256258
if page.id in self.indexed_pages:
257259
logging.debug(f"Already indexed page with ID '{page.id}'. Skipping.")
258260
continue

rag/svr/sync_data_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ async def __call__(self, task: dict):
7878
} for doc in document_batch]
7979

8080
e, kb = KnowledgebaseService.get_by_id(task["kb_id"])
81-
err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}")
81+
err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}", task["auto_parse"])
8282
SyncLogsService.increase_docs(task["id"], min_update, max_update, len(docs), "\n".join(err), len(err))
8383
doc_num += len(docs)
8484

0 commit comments

Comments
 (0)