Skip to content

Commit d62438c

Browse files
committed
feat(kb): implement shared ingestion executor and simplify delete operations
Backend changes: - Add shared thread pool _ingest_executor to prevent global thread pool exhaustion - Implement graceful shutdown with timeout mechanism (30s) for ingestion executor - Fix rename_collection_api cross-user data pollution by adding user_id filter - Improve rename_collection_api transaction safety with pre-validation - Simplify delete_document_api: remove complex UploadedFile and physical file handling - Physical file cleanup is now handled at collection level only Frontend changes: - Extend IngestionResult interface with embedding_count, vector_count, warnings, failed_step - Add toast warning when embedding_count is 0 but chunks_count > 0 - Improve error handling for both document and web ingestion with partial status warnings
1 parent bf43ec4 commit d62438c

3 files changed

Lines changed: 163 additions & 43 deletions

File tree

frontend/src/components/kb/knowledge-base-creation-dialog.tsx

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@ interface IngestionResult {
3333
collection: string
3434
document_count: number
3535
chunks_count: number
36+
embedding_count?: number
37+
vector_count?: number
3638
status: string
3739
message: string
40+
warnings?: string[]
41+
failed_step?: string
3842
}
3943

4044
interface WebIngestionResult {
@@ -287,13 +291,24 @@ export function KnowledgeBaseCreationDialog({ open, onOpenChange, onSuccess }: K
287291
throw new Error(errorData.detail || t("kb.errors.uploadFailedFile", { name: file.name }))
288292
}
289293

290-
const result = await response.json()
291-
setIngestionResults(prev => [...prev, result])
294+
const result: IngestionResult = await response.json()
292295

293-
if (result.status === "partial" && result.failed_step) {
294-
throw new Error(result.message || t("kb.errors.failedAtStep", { step: result.failed_step }))
296+
if (result.status === "error") {
297+
throw new Error(result.message || t("kb.errors.uploadFailedFile", { name: file.name }))
298+
}
299+
if (result.status === "partial") {
300+
toast.warning(
301+
result.message + (result.warnings?.length ? ` Warnings: ${result.warnings.join(", ")}` : "")
302+
)
303+
}
304+
if (result.status === "success" && (result.embedding_count ?? 0) === 0 && (result.chunks_count ?? 0) > 0) {
305+
toast.error(
306+
"文档上传成功,但 embedding 生成失败(" + result.chunks_count + " 个 chunks 未生成 embedding)。文档无法被搜索。请检查 embedding 模型配置和 API 状态。" +
307+
(result.warnings?.length ? ` 警告: ${result.warnings.join(", ")}` : "")
308+
)
295309
}
296310

311+
setIngestionResults(prev => [...prev, result])
297312
successfulCollections.push(collectionName)
298313
setUploadProgress(((i + 1) / selectedFiles.length) * 100)
299314
}
@@ -370,6 +385,16 @@ export function KnowledgeBaseCreationDialog({ open, onOpenChange, onSuccess }: K
370385
}
371386

372387
const result: WebIngestionResult = await response.json()
388+
389+
if (result.status === "error") {
390+
throw new Error(result.message || t("kb.errors.webIngestFailed"))
391+
}
392+
if (result.status === "partial") {
393+
toast.warning(
394+
result.message + (result.warnings?.length ? ` Warnings: ${result.warnings.join(", ")}` : "")
395+
)
396+
}
397+
373398
setWebIngestionResult(result)
374399
setWebIngestionProgress(100)
375400

src/xagent/web/api/kb.py

Lines changed: 128 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import asyncio
44
import concurrent.futures
5+
import contextvars
56
import functools
67
import json
78
import logging
@@ -102,6 +103,50 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any:
102103
# Create router
103104
kb_router = APIRouter(prefix="/api/kb", tags=["kb"])
104105

106+
# Shared executor for ingestion tasks to prevent global thread pool exhaustion
107+
_ingest_executor = concurrent.futures.ThreadPoolExecutor(
108+
max_workers=10, thread_name_prefix="ingest_worker_"
109+
)
110+
111+
112+
def shutdown_ingest_executor() -> None:
113+
"""Shutdown the shared ingestion executor gracefully.
114+
115+
Called during application shutdown to ensure pending tasks complete
116+
and resources are properly released. Uses a timeout to prevent blocking
117+
application shutdown indefinitely.
118+
"""
119+
logger.info("Shutting down ingestion executor...")
120+
try:
121+
# Use threading.Event to implement timeout with executor shutdown
122+
import threading
123+
124+
shutdown_complete = threading.Event()
125+
126+
def wait_for_shutdown() -> None:
127+
_ingest_executor.shutdown(wait=True)
128+
shutdown_complete.set()
129+
130+
shutdown_thread = threading.Thread(target=wait_for_shutdown, daemon=True)
131+
shutdown_thread.start()
132+
133+
# Wait for shutdown with timeout
134+
if shutdown_complete.wait(timeout=30):
135+
logger.info("Ingestion executor shutdown complete")
136+
else:
137+
logger.warning(
138+
"Executor shutdown timed out after 30s; forcing shutdown. "
139+
"Some ingestion tasks may be incomplete."
140+
)
141+
_ingest_executor.shutdown(wait=False)
142+
except Exception as e:
143+
logger.error("Error during executor shutdown: %s", e)
144+
# Force shutdown on any error
145+
try:
146+
_ingest_executor.shutdown(wait=False)
147+
except Exception:
148+
pass # Ignore errors during forced shutdown
149+
105150

106151
class CloudFile(BaseModel):
107152
provider: str
@@ -427,19 +472,18 @@ async def ingest(
427472

428473
progress_manager = get_progress_manager()
429474

430-
def _run_ingestion() -> IngestionResult:
431-
return run_document_ingestion(
475+
result: IngestionResult = await asyncio.get_running_loop().run_in_executor(
476+
_ingest_executor,
477+
lambda: contextvars.copy_context().run(
478+
run_document_ingestion,
432479
collection=collection,
433480
source_path=str(file_path),
434481
ingestion_config=config,
435482
progress_manager=progress_manager,
436483
user_id=int(_user.id),
437484
is_admin=bool(_user.is_admin),
438-
)
439-
440-
with concurrent.futures.ThreadPoolExecutor() as executor:
441-
future = executor.submit(_run_ingestion)
442-
result: IngestionResult = future.result()
485+
),
486+
)
443487

444488
if result.status == "error":
445489
return JSONResponse(status_code=500, content=result.model_dump())
@@ -545,14 +589,17 @@ def _download_file() -> None:
545589

546590
# Run ingestion (blocking)
547591
try:
548-
result = await asyncio.to_thread(
549-
run_document_ingestion,
550-
collection=request.collection,
551-
source_path=str(file_path),
552-
ingestion_config=config,
553-
progress_manager=progress_manager,
554-
user_id=int(_user.id),
555-
is_admin=bool(_user.is_admin),
592+
result = await asyncio.get_running_loop().run_in_executor(
593+
_ingest_executor,
594+
lambda: contextvars.copy_context().run(
595+
run_document_ingestion,
596+
collection=request.collection,
597+
source_path=str(file_path),
598+
ingestion_config=config,
599+
progress_manager=progress_manager,
600+
user_id=int(_user.id),
601+
is_admin=bool(_user.is_admin),
602+
),
556603
)
557604
return result
558605
except Exception as e:
@@ -864,15 +911,16 @@ async def ingest_web(
864911
max_retries: Maximum retry attempts
865912
retry_delay: Delay between retries
866913
"""
914+
# SECURITY: Validate collection name at API boundary
867915
try:
868-
try:
869-
safe_collection = sanitize_path_component(collection, "collection")
870-
except ValueError as e:
871-
logger.warning("Invalid collection name rejected: %s - %s", collection, e)
872-
raise HTTPException(
873-
status_code=422, detail=f"Invalid collection name: {str(e)}"
874-
) from e
916+
safe_collection = sanitize_path_component(collection, "collection")
917+
except ValueError as e:
918+
logger.warning("Invalid collection name rejected: %s - %s", collection, e)
919+
raise HTTPException(
920+
status_code=422, detail=f"Invalid collection name: {str(e)}"
921+
) from e
875922

923+
try:
876924
url_patterns_list = (
877925
[p.strip() for p in url_patterns.split(",")] if url_patterns else None
878926
)
@@ -958,16 +1006,18 @@ async def ingest_web(
9581006
),
9591007
)
9601008

961-
result = await asyncio.get_event_loop().run_in_executor(
962-
None,
963-
lambda: asyncio.run(
1009+
# Run web ingestion
1010+
result = await asyncio.get_running_loop().run_in_executor(
1011+
_ingest_executor,
1012+
lambda: contextvars.copy_context().run(
1013+
asyncio.run,
9641014
run_web_ingestion(
9651015
collection=safe_collection,
9661016
crawl_config=crawl_config,
9671017
ingestion_config=ingestion_config,
9681018
user_id=int(_user.id),
9691019
is_admin=bool(_user.is_admin),
970-
)
1020+
),
9711021
),
9721022
)
9731023

@@ -1321,6 +1371,10 @@ async def delete_document_api(
13211371
This endpoint uses filename lookup which may have a race condition if
13221372
the same filename is uploaded multiple times concurrently. For production
13231373
use, consider using doc_id directly or adding a filename index column.
1374+
1375+
Physical file cleanup is handled at the collection level (when the entire
1376+
collection is deleted). This endpoint only removes the document from
1377+
LanceDB vector storage.
13241378
"""
13251379
# NOTE: Exceptions are normalized by @handle_kb_exceptions for consistent API responses.
13261380
from ...core.tools.core.RAG_tools.LanceDB.schema_manager import (
@@ -1642,33 +1696,66 @@ async def rename_collection_api(
16421696
physical_rename_error = f"Path resolution error: {str(e)}"
16431697

16441698
# Step 2: Update collection name in all tables
1699+
# NOTE: LanceDB does not support ACID transactions across multiple tables.
1700+
# We use a best-effort approach: validate all tables are accessible first,
1701+
# then perform updates. If any update fails, we record the failure but cannot
1702+
# roll back previously completed updates.
16451703
table_names = _list_table_names(conn, warnings)
16461704

1705+
# Pre-flight validation: ensure all target tables are accessible
1706+
tables_to_update = []
16471707
for table_name in ["documents", "parses", "chunks"]:
16481708
if table_name in table_names:
1649-
try:
1650-
table = conn.open_table(table_name)
1651-
table.update(
1652-
f"collection = '{escape_lancedb_string(collection_name)}'",
1653-
{"collection": new_name},
1654-
)
1655-
except Exception as e:
1656-
logger.warning("Failed to update '%s': %s", table_name, e)
1657-
warnings.append(f"Failed to update '{table_name}': {e}")
1709+
tables_to_update.append(table_name)
16581710

16591711
for table_name in table_names:
1660-
if not table_name.startswith("embeddings_"):
1661-
continue
1712+
if table_name.startswith("embeddings_"):
1713+
tables_to_update.append(table_name)
1714+
1715+
# Validate all tables can be opened
1716+
validation_errors = []
1717+
for table_name in tables_to_update:
1718+
try:
1719+
conn.open_table(table_name)
1720+
except Exception as e:
1721+
validation_errors.append(f"Cannot access table '{table_name}': {e}")
1722+
1723+
if validation_errors:
1724+
# Fail fast if any table is inaccessible
1725+
error_msg = f"Cannot rename collection: {', '.join(validation_errors)}"
1726+
logger.error("Rename validation failed: %s", error_msg)
1727+
raise HTTPException(
1728+
status_code=500,
1729+
detail=error_msg,
1730+
)
1731+
1732+
# Perform all updates (best-effort, no rollback)
1733+
update_success = {}
1734+
update_failed = {}
1735+
1736+
for table_name in tables_to_update:
16621737
try:
16631738
table = conn.open_table(table_name)
16641739
table.update(
1665-
f"collection = '{escape_lancedb_string(collection_name)}'",
1740+
f"collection = '{escape_lancedb_string(collection_name)}' AND user_id = {int(_user.id)}",
16661741
{"collection": new_name},
16671742
)
1743+
update_success[table_name] = True
16681744
except Exception as e:
1669-
logger.warning("Failed to update embeddings table '%s': %s", table_name, e)
1745+
update_failed[table_name] = str(e)
1746+
logger.error("Failed to update table '%s': %s", table_name, e)
16701747
warnings.append(f"Failed to update '{table_name}': {e}")
16711748

1749+
# Log summary of update results
1750+
if update_failed:
1751+
logger.warning(
1752+
"Collection rename update summary: %d succeeded, %d failed. Success: %s, Failed: %s",
1753+
len(update_success),
1754+
len(update_failed),
1755+
list(update_success.keys()),
1756+
list(update_failed.keys()),
1757+
)
1758+
16721759
# Migrate ingestion status from old collection name to new
16731760
try:
16741761
status_entries = load_ingestion_status(collection=collection_name)
@@ -1734,11 +1821,13 @@ async def rename_collection_api(
17341821
"status": final_status,
17351822
"message": final_message,
17361823
"warnings": warnings,
1824+
"physical_move": physical_rename_status,
17371825
}
17381826

17391827
return {
17401828
"status": "success",
17411829
"message": f"Collection renamed from '{collection_name}' to '{new_name}'",
1830+
"physical_move": physical_rename_status,
17421831
}
17431832

17441833

src/xagent/web/app.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,12 @@ async def shutdown_event() -> None:
212212
if sandbox_mgr:
213213
await sandbox_mgr.cleanup()
214214

215+
# Shutdown shared ingestion executor
216+
from .api.kb import shutdown_ingest_executor
217+
218+
shutdown_ingest_executor()
219+
logger.info("Ingestion executor shutdown completed")
220+
215221

216222
# Frontend is now served by Next.js at http://localhost:3000
217223
# This backend only provides API endpoints

0 commit comments

Comments
 (0)