Skip to content

feat(kb): [WIP]overhaul knowledge base workflows and retrieval#8630

Open
lxfight wants to merge 37 commits into
AstrBotDevs:masterfrom
lxfight:enhance/kb-stability-and-features
Open

feat(kb): [WIP]overhaul knowledge base workflows and retrieval#8630
lxfight wants to merge 37 commits into
AstrBotDevs:masterfrom
lxfight:enhance/kb-stability-and-features

Conversation

@lxfight
Copy link
Copy Markdown
Member

@lxfight lxfight commented Jun 6, 2026

Improve the knowledge base feature across backend storage, retrieval quality, upload/import workflows, dashboard usability, and regression coverage.

Changes

Modifications / 改动点

  • Add and harden KB document workflows, including batch delete, document search, upload/import progress, partial failure reporting, and safer rollback behavior.

  • Improve retrieval behavior with kb_ids filtering, HNSW support, cosine/IP FAISS migration, parallel dense retrieval, sparse retrieval fixes, RRF cleanup, and BM25 memory protection.

  • Add storage and migration safeguards, including FAISS index backup, external ID preservation, incremental index_type migration, KB-scoped stats, cascade deletion, generated kb_id column, name index, and initialization retry/locking.

  • Preserve more Markdown structure during chunking.

  • Improve dashboard KB pages, including overview/detail/document layouts, document search, upload UX, validation feedback, failed upload surfacing, task status handling, and localized text.

  • Add dashboard tests and consolidate Python KB regression coverage into tests/test_knowledge_base.py.

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

  • uv run ruff format .
  • uv run ruff check .
  • uv run pytest tests/test_knowledge_base.py -q
  • node --test dashboard/tests/knowledgeBase.test.mjs

Checklist / 检查清单

  • 😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
    / 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。

  • 👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
    / 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”

  • 🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
    / 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到 requirements.txtpyproject.toml 文件相应位置。

  • 😮 My changes do not introduce malicious code.
    / 我的更改没有引入恶意代码。

lxfight added 30 commits June 4, 2026 15:21
The previous implementation called vec_db.count_documents() without any
metadata filter, which counted chunks across ALL knowledge bases instead
of just the target KB. This caused each KB's chunk_count statistic to
reflect the total across all KBs, producing garbled stats.

Fix: pass metadata_filter={'kb_id': kb_id} so only chunks belonging to
the requested knowledge base are counted.
FTS5 bm25() uses lower-is-better (0=perfect) while BM25Okapi fallback
uses higher-is-better. When merged with reverse sort, all BM25 results
systematically outranked FTS5 results regardless of relevance.

RRF only uses rank positions, not score magnitudes — so this fix simply
ensures both paths share the same sort direction.

Changes:
- FTS5: clamp negative bm25() values to 0, sort ascending
- BM25 fallback: negate scores, sort ascending
- Add debug logging for sparse top-5 and RRF top-5 scores
When vec_db.insert_batch() succeeds but the subsequent metadata commit
fails, vectors are already persisted in FAISS and DocumentStorage but
have no corresponding KBDocument record, creating orphan data.

Fix: track vectors_stored flag, and in the exception handler call
vec_db.delete_documents(metadata_filters={'kb_doc_id': doc_id}) to
clean up both FAISS embeddings and document storage rows.

The sys.modules workaround in tests breaks the circular import chain
(kb_helper → provider.manager → kb_mgr → provider.manager).
KnowledgeBaseRoute is a process-lifetime singleton; its upload_tasks
and upload_progress dicts grew without bound because the cleanup code
in get_upload_progress was commented out.

Two-tier cleanup:
1. Immediate: remove task on poll when completed/failed
2. Safety net: background tasks schedule delayed cleanup (5 min)
   via _schedule_delayed_cleanup, guarding against clients that
   never poll get_upload_progress

Also adds _cleanup_task() helper — idempotent, uses pop(key, None).
Adds POST /api/kb/document/batch-delete endpoint for deleting multiple
documents in a single request (max 100 per call).

Implementation:
- kb_db_sqlite: delete_documents_by_ids() uses single SQL IN clause
  for kb.db delete, then asyncio.gather for parallel vec_db cleanup
- kb_helper: delete_documents() calls kb_db, then updates stats once
- Best-effort semantics: one vec_db failure doesn't block others

Previously, deleting N documents required N HTTP round-trips and N
separate update_kb_stats calls. Now it's 1 request + 1 stat update.
_retrieve_with_bm25 previously loaded ALL documents with no limit
and no kb_id filter, risking OOM on large knowledge bases when FTS5
is unavailable.

Changes:
- Pass metadata_filters={'kb_id': kb_id} to scope the query
- Cap loaded documents at MAX_BM25_DOCS=10,000 per KB
- Log warning when cap is hit so operators know FTS5 needs attention
EmbeddingStorage.save_index() called faiss.write_index() directly,
which is synchronous I/O and blocks the asyncio event loop. For
large FAISS indexes this can freeze the entire server for seconds.

Fix: wrap faiss.write_index in asyncio.to_thread() so the blocking
I/O runs on a thread pool, keeping the event loop responsive.

Called from insert(), insert_batch(), and delete() — every vector
mutation path.
- IndexFlatL2 → IndexFlatIP: stored vectors now L2-normalized at insert,
  query vectors normalized at search, IP = cosine similarity
- Score formula: (scores + 1.0) / 2.0 maps IP [-1,1] → [0,1]
- Add _safe_normalize_l2() with zero-vector detection (raises ValueError)
- Add asyncio.Lock serializing all FAISS write operations (insert/batch/delete/save)
- Auto-migrate legacy L2 indexes on load (reconstruct + normalize + rebuild)
- Validate dimension match when loading existing index
- Add comprehensive tests: normalization, IP scores, write lock, L2→IP migration,
  dimension mismatch, zero-vector guard, HNSW create/search/persistence
Previously top_k_sparse was summed across KBs, causing sparse results
to dominate RRF fusion. Now each KB's BM25 results are truncated to
its own top_k_sparse before merging, and the global cap uses max().

Add test verifying per-KB truncation with 2 KBs and different limits.
- Add kb_id generated column with index to avoid full table scans
  on json_extract(metadata, '$.kb_id') for metadata filtering
- Extend generated-column optimization to count_documents()
- Parallelize _dense_retrieve across KBs via asyncio.gather
- Support index_type='hnsw' via faiss.index_factory('HNSW32', METRIC_INNER_PRODUCT)
- KnowledgeBase model gains index_type field (flat|hnsw)
- EmbeddingCache: async-locked LRU cache keyed by SHA256(text)
  used in insert_batch and retrieve to skip redundant API calls
- Cache integration validates returned vector count before indexing
- Add tests: cache hit/miss, LRU eviction, insert_batch cache usage
The old migration used np.arange(ntotal) which assigned new sequential
IDs, breaking the mapping between FAISS vectors and DocumentStorage
chunks. Extract original external IDs from IndexIDMap and re-register
them with the new IP index.

Also broadens L2 detection from isinstance(IndexFlatL2) to metric_type
check, covering any L2 variant.
Returning None made it impossible for callers to distinguish
'successfully deleted' from 'chunk not found'. Now returns False
when the chunk is missing, True on successful deletion.
…n None on limit<=0

- Add _fts_rebuild_lock to prevent concurrent FTS5 index rebuilds from
  corrupting the contentless-delete table.
- Switch generated columns from STORED to VIRTUAL so ALTER TABLE is O(1)
  on existing large tables (index still materializes the computed value).
- Extract _apply_metadata_filters helper to reduce duplication across
  get_documents / delete_documents / count_documents.
- Return None from search_sparse when limit <= 0 instead of empty list,
  so callers correctly fall back to in-memory BM25.
…tion with kb_id

- Add _ensure_column helper for safe incremental schema migrations.
- Add index_type column migration in migrate_to_v1 for legacy databases.
- Add kb_id parameter to delete_document_by_id / delete_documents_by_ids
  to prevent accidental cross-KB deletion.
- Reorder batch delete: clean vectors first, then remove metadata only
  for successful vec_db deletions (best-effort per document).
- Also cascade-delete KBMedia rows alongside KBDocument rows.
…regation

- Rename vec_doc_id_to_dense → chunk_id_to_dense in rank_fusion to
  reflect that Document.doc_id stores the chunk UUID, not a doc ID.
- Replace fragile suffix-slicing (merged_results[-len(kb_chunks):])
  with an explicit kb_results accumulator in BM25 fallback path,
  keeping per-KB truncation self-contained.
…e, track init retries

- Wrap RateLimiter time checks in asyncio.Lock for accurate throttling
  when used concurrently via asyncio.gather.
- Raise ValueError from delete_chunk/delete_document when the target
  chunk or document does not exist, instead of silently succeeding.
- Pass kb_id to delete_documents_by_ids for cross-KB safety.
- Add init_retry_count and last_init_retry_at fields on KBHelper to
  support cooldown-gated re-initialization in KnowledgeBaseManager.
…d init retry

- Delete KBDocument and KBMedia rows alongside KnowledgeBase when
  removing a KB, preventing orphan metadata records.
- Add asyncio.Lock around kb_insts mutations so get_kb / update_kb /
  delete_kb are serialized and readers never observe a half-swapped
  instance.
- Maintain O(1) kb_name→kb_id index to replace linear scan in
  get_kb_by_name.
- Add cooldown-gated init retry (60 s, max 3 attempts) so transient
  provider outages do not permanently disable a KB.
- Use _UNSET sentinel for rerank_provider_id in update_kb so callers
  can distinguish 'not provided' from 'explicitly set to None'.
- Clean up file directories when create_kb fails after helper init,
  preventing empty orphan directories.
…d validation

- Return a user-visible message when all session-configured KBs are
  unavailable instead of silently returning None.
- Use _UNSET sentinel for rerank_provider_id in the update-kb route so
  omitting the field preserves the existing value while passing null
  explicitly clears it.
- Add update_fields allowlist check to reject requests with no
  recognised update fields.
- Fix _format_failed_doc_error startswith check to require 'filename:'
  prefix, preventing false positives when file name happens to be a
  prefix of the error message.
- Add index_type to the recognised update fields.
…, and migration

- Verify delete_document_by_id rejects documents from a different KB.
- Verify metadata is preserved when vector deletion fails.
- Verify delete_chunk raises ValueError for missing chunks.
- Add integration test for index_type column migration on legacy DB.
- Assert kb_id is forwarded through the helper→db call chain.
…s limit

- Add search parameter to list_documents_by_kb and count_documents_by_kb
  for SQL ILIKE filtering on doc_name and file_type.
- Change list_kbs default limit from 100 to None so callers can request
  all KB records without an artificial ceiling.
- Add KBHelper.count_documents to forward the search filter to kb_db.
…utation

- Accept kb_ids as an alternative to kb_names in retrieve(), so callers
  that already hold KB IDs can skip the name→id lookup.
- Add _validate_kb_options helper to reject invalid chunk/retrieval
  parameters (negative sizes, overlap >= chunk_size, unknown index_type)
  at the manager layer before persisting any state.
- In update_kb, build a candidate_state dict first, validate it, and
  only mutate the KB model after validation passes — this prevents
  partially-updated in-memory state when invalid options are submitted.
…rt search/kb_ids in API

- Add _coerce_optional_int, _validate_chunk_options, _validate_kb_options,
  _validate_upload_options, and _validate_upload_file helpers so every
  numeric parameter is validated before use.
- Enforce ALLOWED_UPLOAD_EXTENSIONS whitelist and 128 MB file size cap.
- Expose kb_ids alongside kb_names in /kb/retrieve so the dashboard can
  query by stable ID.
- Add search and total count to /kb/document/list for paginated search
  in the UI.
- Pre-validate update_kb options against the existing KB state before
  forwarding to the manager layer.
- Resolve t-SNE visualization kb_names from kb_ids when only IDs are
  provided.
…ion feedback in KB UI

- Add search input to DocumentsTab with debounced server-side filtering.
- Show total document count alongside paginated results.
- Improve file upload with better validation feedback, file count
  display, and clear-button for the selection.
- Add upload parameter validation (chunk_size, chunk_overlap, etc.)
  with inline error messages before submitting.
- Surface init_error state on KB cards in the list view.
- Add i18n keys for new UI strings across en-US, zh-CN, and ru-RU.
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, we are unable to review this pull request

The GitHub API does not allow us to fetch diffs exceeding 20000 lines

@dosubot dosubot Bot added size:XXL This PR changes 1000+ lines, ignoring generated files. area:webui The bug / feature is about webui(dashboard) of astrbot. feature:knowledge-base The bug / feature is about knowledge base labels Jun 6, 2026
@lxfight lxfight changed the title fix: [WIP]improve knowledge base ingestion stability feat: [WIP]improve knowledge base ingestion stability Jun 6, 2026
@lxfight lxfight changed the title feat: [WIP]improve knowledge base ingestion stability feat(kb): [WIP]overhaul knowledge base workflows and retrieval Jun 6, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces comprehensive upgrades to the knowledge base ingestion, storage, and retrieval pipelines, including FTS5-based document searching, a thread-safe embedding cache, automatic FAISS index migration from L2 to Inner Product, background ingestion task tracking, and index consistency check/repair tools. Review feedback identified critical issues: a swallowed RuntimeError during single-KB dense retrieval due to asyncio.gather catching exceptions, potential SQLite database locking during concurrent batch deletions, a potential TypeError in FAISS index saving when the path is None, and a potential AttributeError in task serialization if timestamp fields are missing.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +749 to +756
results_per_kb = await asyncio.gather(*tasks, return_exceptions=True)

all_results: list[Result] = []
for result in results_per_kb:
if isinstance(result, Exception):
logger.error(f"稠密检索异常: {result}", exc_info=True)
continue
all_results.extend(result)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In _dense_retrieve, asyncio.gather is called with return_exceptions=True. This means any RuntimeError raised inside _retrieve_one (which is explicitly raised when len(kb_ids) == 1) will be caught and returned as an exception object in results_per_kb instead of being raised to the caller. The loop then simply logs the exception and continues, effectively swallowing the error and returning an empty list [] instead of raising the intended RuntimeError.

We should check if len(kb_ids) == 1 and re-raise the exception if it is an instance of Exception.

Suggested change
results_per_kb = await asyncio.gather(*tasks, return_exceptions=True)
all_results: list[Result] = []
for result in results_per_kb:
if isinstance(result, Exception):
logger.error(f"稠密检索异常: {result}", exc_info=True)
continue
all_results.extend(result)
results_per_kb = await asyncio.gather(*tasks, return_exceptions=True)
all_results: list[Result] = []
for result in results_per_kb:
if isinstance(result, Exception):
if len(kb_ids) == 1:
raise result
logger.error(f"稠密检索异常: {result}", exc_info=True)
continue
all_results.extend(result)

Comment on lines +750 to +752
vec_results = await asyncio.gather(
*[_delete_one(doc_id) for doc_id in candidates],
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In delete_documents_by_ids, asyncio.gather is used to concurrently execute _delete_one for all candidate documents. Since _delete_one performs write operations on the SQLite database (via delete_documents), running them concurrently without any rate-limiting or serialization will likely cause SQLite "database is locked" (OperationalError) errors under load or with many documents. It is safer to execute them sequentially or use a semaphore to limit concurrency.

Suggested change
vec_results = await asyncio.gather(
*[_delete_one(doc_id) for doc_id in candidates],
)
vec_results = []
for doc_id in candidates:
vec_results.append(await _delete_one(doc_id))

Comment on lines 256 to +258
if self.index is None:
return
faiss.write_index(self.index, self.path)
await asyncio.to_thread(faiss.write_index, self.index, self.path)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In _save_index_locked, faiss.write_index is called with self.path. However, self.path can be None (as defined in __init__ where path: str | None = None). If self.path is None, calling write_index will raise a TypeError. A guard should be added to ensure self.path is not None before attempting to write the index.

Suggested change
if self.index is None:
return
faiss.write_index(self.index, self.path)
await asyncio.to_thread(faiss.write_index, self.index, self.path)
if self.index is None or self.path is None:
return
await asyncio.to_thread(faiss.write_index, self.index, self.path)

Comment on lines +334 to +335
"created_at": task.created_at.isoformat(),
"updated_at": task.updated_at.isoformat(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In _task_to_dict, task.created_at and task.updated_at are converted to ISO format using .isoformat(). If either of these fields is None (e.g., due to legacy database records or corrupted states), this will raise an AttributeError. Adding a None-check ensures robust defensive programming.

Suggested change
"created_at": task.created_at.isoformat(),
"updated_at": task.updated_at.isoformat(),
"created_at": task.created_at.isoformat() if task.created_at else None,
"updated_at": task.updated_at.isoformat() if task.updated_at else None,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:webui The bug / feature is about webui(dashboard) of astrbot. feature:knowledge-base The bug / feature is about knowledge base size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant