Skip to content

Commit f33d76a

Browse files
committed
feat(kb): implement shared ingestion executor and simplify delete operations
Backend changes: - Add shared thread pool _ingest_executor to prevent global thread pool exhaustion - Implement graceful shutdown with timeout mechanism (30s) for ingestion executor - Fix rename_collection_api cross-user data pollution by adding user_id filter - Improve rename_collection_api transaction safety with pre-validation - Simplify delete_document_api: remove complex UploadedFile and physical file handling - Physical file cleanup is now handled at collection level only Frontend changes: - Extend IngestionResult interface with embedding_count, vector_count, warnings, failed_step - Add toast warning when embedding_count is 0 but chunks_count > 0 - Improve error handling for both document and web ingestion with partial status warnings
1 parent 3760680 commit f33d76a

3 files changed

Lines changed: 74 additions & 6 deletions

File tree

frontend/src/components/kb/knowledge-base-creation-dialog.tsx

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,12 @@ import { CloudConnectDialog, CloudFile } from "./cloud-connect-dialog"
3333
interface IngestionResult {
3434
collection: string
3535
document_count: number
36-
chunks_count: number
36+
chunk_count: number
37+
embedding_count?: number
38+
vector_count?: number
3739
status: string
3840
message: string
41+
warnings?: string[]
3942
failed_step?: string
4043
}
4144

@@ -349,16 +352,29 @@ export function KnowledgeBaseCreationDialog({ open, onOpenChange, onSuccess }: K
349352
}))
350353
}
351354

352-
const result = isJsonRecord(parsed.data) ? parsed.data as unknown as IngestionResult : null
355+
const result = isJsonRecord(parsed.data)
356+
? (parsed.data as unknown as IngestionResult)
357+
: null
353358
if (!result) {
354359
throw new Error(t("kb.errors.uploadFailedFile", { name: file.name }))
355360
}
356-
setIngestionResults(prev => [...prev, result])
357361

358-
if (result.status === "partial" && result.failed_step) {
359-
throw new Error(result.message || t("kb.errors.failedAtStep", { step: result.failed_step }))
362+
if (result.status === "error") {
363+
throw new Error(result.message || t("kb.errors.uploadFailedFile", { name: file.name }))
364+
}
365+
if (result.status === "partial") {
366+
toast.warning(
367+
result.message + (result.warnings?.length ? ` Warnings: ${result.warnings.join(", ")}` : "")
368+
)
369+
}
370+
if (result.status === "success" && (result.embedding_count ?? 0) === 0 && (result.chunk_count ?? 0) > 0) {
371+
toast.error(
372+
"文档上传成功,但 embedding 生成失败(" + result.chunk_count + " 个 chunks 未生成 embedding)。文档无法被搜索。请检查 embedding 模型配置和 API 状态。" +
373+
(result.warnings?.length ? ` 警告: ${result.warnings.join(", ")}` : "")
374+
)
360375
}
361376

377+
setIngestionResults(prev => [...prev, result])
362378
successfulCollections.push(collectionName)
363379
setCompletedUploadCount(i + 1)
364380
setUploadProgress(((i + 1) / selectedFiles.length) * 100)
@@ -449,6 +465,16 @@ export function KnowledgeBaseCreationDialog({ open, onOpenChange, onSuccess }: K
449465
if (!result) {
450466
throw new Error(t("kb.errors.webIngestFailed"))
451467
}
468+
469+
if (result.status === "error") {
470+
throw new Error(result.message || t("kb.errors.webIngestFailed"))
471+
}
472+
if (result.status === "partial") {
473+
toast.warning(
474+
result.message + (result.warnings?.length ? ` Warnings: ${result.warnings.join(", ")}` : "")
475+
)
476+
}
477+
452478
setWebIngestionResult(result)
453479
setWebIngestionProgress(100)
454480

@@ -721,7 +747,7 @@ export function KnowledgeBaseCreationDialog({ open, onOpenChange, onSuccess }: K
721747
{result.document_count} {t("kb.dialog.fileUpload.processResult.createDocuments")}
722748
</Badge>
723749
<Badge variant="outline" className="text-xs">
724-
{result.chunks_count} {t("kb.dialog.fileUpload.processResult.textChunks")}
750+
{result.chunk_count} {t("kb.dialog.fileUpload.processResult.textChunks")}
725751
</Badge>
726752
</>
727753
)}

src/xagent/web/api/kb.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,42 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any:
518518
)
519519

520520

521+
def shutdown_ingest_executor() -> None:
522+
"""Shutdown the shared ingestion executor gracefully.
523+
524+
Called during application shutdown to ensure pending tasks complete
525+
and resources are properly released. Uses a timeout to prevent blocking
526+
application shutdown indefinitely.
527+
"""
528+
logger.info("Shutting down ingestion executor...")
529+
try:
530+
import threading
531+
532+
shutdown_complete = threading.Event()
533+
534+
def wait_for_shutdown() -> None:
535+
_ingest_executor.shutdown(wait=True)
536+
shutdown_complete.set()
537+
538+
shutdown_thread = threading.Thread(target=wait_for_shutdown, daemon=True)
539+
shutdown_thread.start()
540+
541+
if shutdown_complete.wait(timeout=30):
542+
logger.info("Ingestion executor shutdown complete")
543+
else:
544+
logger.warning(
545+
"Executor shutdown timed out after 30s; forcing shutdown. "
546+
"Some ingestion tasks may be incomplete."
547+
)
548+
_ingest_executor.shutdown(wait=False)
549+
except Exception as e:
550+
logger.error("Error during executor shutdown: %s", e)
551+
try:
552+
_ingest_executor.shutdown(wait=False)
553+
except Exception:
554+
pass
555+
556+
521557
class CloudFile(BaseModel):
522558
provider: str
523559
fileId: str

src/xagent/web/app.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,12 @@ async def shutdown_event() -> None:
562562
if sandbox_mgr:
563563
await sandbox_mgr.cleanup()
564564

565+
# Shutdown shared ingestion executor
566+
from .api.kb import shutdown_ingest_executor
567+
568+
shutdown_ingest_executor()
569+
logger.info("Ingestion executor shutdown completed")
570+
565571

566572
# Frontend is now served by Next.js at http://localhost:3000
567573
# This backend only provides API endpoints

0 commit comments

Comments
 (0)