Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
63af419
fix(kb): scope chunk_count to current KB in update_kb_stats
lxfight Jun 4, 2026
72f2c5f
fix(kb): unify sparse retrieval score direction to lower-is-better
lxfight Jun 4, 2026
4861351
fix(kb): rollback orphan vectors when metadata save fails after upload
lxfight Jun 4, 2026
6e2c979
fix(sidebar): prevent memory leak in upload_tasks/upload_progress dicts
lxfight Jun 4, 2026
a277a80
feat(kb): add batch delete documents API
lxfight Jun 4, 2026
7f80c32
fix(kb): cap BM25 fallback at 10K chunks to prevent OOM
lxfight Jun 4, 2026
b98f87a
fix(kb): offload faiss.write_index to thread via asyncio.to_thread
lxfight Jun 4, 2026
611a3f2
fix(kb): switch FAISS from L2 to IP cosine similarity with write lock
lxfight Jun 5, 2026
034a628
fix(kb): correct BM25 top_k_sparse aggregation across multiple KBs
lxfight Jun 5, 2026
ae3084b
perf(kb): add kb_id generated column and parallelize dense retrieval
lxfight Jun 5, 2026
191d713
feat(kb): add HNSW index support and thread-safe LRU embedding cache
lxfight Jun 5, 2026
72e6a76
fix(kb): preserve FAISS external IDs during L2→IP index migration
lxfight Jun 5, 2026
c85b3bb
fix(kb): return bool from vec_db.delete to signal whether chunk existed
lxfight Jun 5, 2026
f5927bc
fix(kb): serialize FTS5 rebuild, use VIRTUAL generated columns, retur…
lxfight Jun 5, 2026
af0c499
fix(kb): add incremental index_type migration and guard document dele…
lxfight Jun 5, 2026
b350a1f
refactor(kb): clarify RRF variable names and simplify BM25 per-KB agg…
lxfight Jun 5, 2026
7be8ff2
fix(kb): add rate-limiter lock, validate chunk/doc existence on delet…
lxfight Jun 5, 2026
97529b5
fix(kb): cascade-delete KB records, add instance lock, name index, an…
lxfight Jun 5, 2026
256a082
fix(kb): improve dashboard error messages, rerank sentinel, and uploa…
lxfight Jun 5, 2026
1335c0b
test(kb): add coverage for kb_id-guarded deletion, vec-delete failure…
lxfight Jun 5, 2026
bc295ca
feat(kb): add document search by name/type and remove default list_kb…
lxfight Jun 5, 2026
58fd43d
feat(kb): support kb_ids in retrieve and validate KB options before m…
lxfight Jun 5, 2026
fb25d19
fix(kb): validate upload parameters, add file type/size checks, suppo…
lxfight Jun 5, 2026
ff0a2a4
feat(dashboard): add document search, improved upload UX, and validat…
lxfight Jun 5, 2026
c5000f6
feat(kb): upgrade knowledge base workflows
lxfight Jun 6, 2026
96f6070
fix: back up faiss index before migration
lxfight Jun 6, 2026
fc32513
feat: preserve markdown structures during chunking
lxfight Jun 6, 2026
c4dcbf1
fix: surface failed knowledge base uploads
lxfight Jun 6, 2026
be40c83
style: improve knowledge base document detail layout
lxfight Jun 6, 2026
fd8fa36
style: simplify knowledge base overview layout
lxfight Jun 6, 2026
d079cde
fix: clean up faiss batch insert failures
lxfight Jun 6, 2026
291b7e4
fix: enforce kb document ownership
lxfight Jun 6, 2026
692d8f9
fix: report partial kb task failures
lxfight Jun 6, 2026
a951588
test: consolidate knowledge base regression coverage
lxfight Jun 6, 2026
64f4c33
test: merge knowledge base test files
lxfight Jun 6, 2026
7e8ff4c
test: restore split knowledge base tests
lxfight Jun 6, 2026
e4f2312
test: align kb import failure status expectation
lxfight Jun 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 198 additions & 54 deletions astrbot/core/db/vec_db/faiss_impl/document_storage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
from asyncio import Lock
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
Expand All @@ -17,6 +18,7 @@
build_fts5_or_query,
load_stopwords,
to_fts5_search_text,
tokenize_text,
)

FTS_TABLE_NAME = "documents_fts"
Expand Down Expand Up @@ -58,44 +60,49 @@ def __init__(self, db_path: str) -> None:
self._fts_contentless_delete = False
self._fts_index_ready = False
self._stopwords: set[str] | None = None
self._fts_rebuild_lock = Lock()

async def initialize(self) -> None:
"""Initialize the SQLite database and create the documents table if it doesn't exist."""
await self.connect()
async with self.engine.begin() as conn: # type: ignore
await self._ensure_documents_table(conn)

try:
await conn.execute(
text(
"ALTER TABLE documents ADD COLUMN kb_doc_id TEXT "
"GENERATED ALWAYS AS (json_extract(metadata, '$.kb_doc_id')) STORED",
),
)
await conn.execute(
text(
"ALTER TABLE documents ADD COLUMN user_id TEXT "
"GENERATED ALWAYS AS (json_extract(metadata, '$.user_id')) STORED",
),
)

# Create indexes
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS idx_documents_kb_doc_id ON documents(kb_doc_id)",
),
)
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS idx_documents_user_id ON documents(user_id)",
),
)
except BaseException:
pass
await self._ensure_generated_columns(conn)

await self._initialize_fts5(conn)
await conn.commit()

async def _table_columns(self, executor, table_name: str) -> set[str]:
result = await executor.execute(text(f"PRAGMA table_xinfo({table_name})"))
return {row[1] for row in result.fetchall()}

async def _ensure_generated_columns(self, executor) -> None:
generated_columns = {
"kb_doc_id": "json_extract(metadata, '$.kb_doc_id')",
"user_id": "json_extract(metadata, '$.user_id')",
"kb_id": "json_extract(metadata, '$.kb_id')",
}
columns = await self._table_columns(executor, "documents")
for column_name, expression in generated_columns.items():
if column_name in columns:
continue
await executor.execute(
text(
f"ALTER TABLE documents ADD COLUMN {column_name} TEXT "
f"GENERATED ALWAYS AS ({expression}) VIRTUAL",
),
)
columns.add(column_name)

index_statements = [
"CREATE INDEX IF NOT EXISTS idx_documents_kb_doc_id "
"ON documents(kb_doc_id)",
"CREATE INDEX IF NOT EXISTS idx_documents_user_id ON documents(user_id)",
"CREATE INDEX IF NOT EXISTS idx_documents_kb_id ON documents(kb_id)",
]
for statement in index_statements:
await executor.execute(text(statement))

async def _ensure_documents_table(self, executor) -> None:
"""Create the document table from the SQLModel definition."""
result = await executor.execute(
Expand Down Expand Up @@ -302,11 +309,11 @@ async def get_documents(

async with self.get_session() as session:
query = select(Document)

for key, val in metadata_filters.items():
query = query.where(
text(f"json_extract(metadata, '$.{key}') = :filter_{key}"),
).params(**{f"filter_{key}": val})
query = await self._apply_metadata_filters(
session,
query,
metadata_filters,
)

if ids is not None and len(ids) > 0:
valid_ids = [int(i) for i in ids if i != -1]
Expand Down Expand Up @@ -468,11 +475,11 @@ async def delete_documents(self, metadata_filters: dict) -> None:

async with self.get_session() as session, session.begin():
query = select(Document)

for key, val in metadata_filters.items():
query = query.where(
text(f"json_extract(metadata, '$.{key}') = :filter_{key}"),
).params(**{f"filter_{key}": val})
query = await self._apply_metadata_filters(
session,
query,
metadata_filters,
)

result = await session.execute(query)
documents = result.scalars().all()
Expand All @@ -499,15 +506,144 @@ async def count_documents(self, metadata_filters: dict | None = None) -> int:
query = select(func.count(col(Document.id)))

if metadata_filters:
for key, val in metadata_filters.items():
query = query.where(
text(f"json_extract(metadata, '$.{key}') = :filter_{key}"),
).params(**{f"filter_{key}": val})
query = await self._apply_metadata_filters(
session,
query,
metadata_filters,
)

result = await session.execute(query)
count = result.scalar_one_or_none()
return count if count is not None else 0

async def search_documents(
self,
query_text: str,
metadata_filters: dict | None = None,
offset: int = 0,
limit: int = 100,
) -> tuple[list[dict], int] | None:
"""Search documents with FTS5 and optional metadata filters.

Returns None when FTS5 is unavailable so callers can choose whether to
fall back to an alternate search strategy.
"""
if limit <= 0:
return [], 0
if not await self.ensure_fts_index():
return None

match_query = build_fts5_or_query(tokenize_text(query_text, self.stopwords))
if not match_query:
return [], 0

metadata_filters = metadata_filters or {}
async with self.get_session() as session:
filters_sql, filter_params = await self._metadata_filter_sql(
session,
metadata_filters,
table_alias="d",
)
where_clause = f"{FTS_TABLE_NAME} MATCH :query"
if filters_sql:
where_clause = f"{where_clause} AND {' AND '.join(filters_sql)}"
params = {
"query": match_query,
"limit": int(limit),
"offset": int(offset),
**filter_params,
}
try:
count_result = await session.execute(
text(
f"""
SELECT count(*)
FROM {FTS_TABLE_NAME}
JOIN documents d ON d.id = {FTS_TABLE_NAME}.rowid
WHERE {where_clause}
""",
),
params,
)
total = int(count_result.scalar_one_or_none() or 0)
result = await session.execute(
text(
f"""
SELECT
d.id AS id,
d.doc_id AS doc_id,
d.text AS text,
d.metadata AS metadata,
d.created_at AS created_at,
d.updated_at AS updated_at,
bm25({FTS_TABLE_NAME}) AS score
FROM {FTS_TABLE_NAME}
JOIN documents d ON d.id = {FTS_TABLE_NAME}.rowid
WHERE {where_clause}
ORDER BY score ASC, d.id ASC
LIMIT :limit
OFFSET :offset
""",
),
params,
)
except Exception as e:
logger.warning(
f"FTS5 document search failed for {self.db_path}: {e}",
)
self.fts5_available = False
return None

rows = result.mappings().all()
return [
{
"id": row["id"],
"doc_id": row["doc_id"],
"text": row["text"],
"metadata": row["metadata"],
"created_at": row["created_at"],
"updated_at": row["updated_at"],
"score": float(row["score"]),
}
for row in rows
], total

async def _apply_metadata_filters(
self,
session: AsyncSession,
query,
metadata_filters: dict,
):
filters_sql, params = await self._metadata_filter_sql(
session,
metadata_filters,
)
for filter_sql in filters_sql:
query = query.where(text(filter_sql))
if params:
query = query.params(**params)
return query

async def _metadata_filter_sql(
self,
session: AsyncSession,
metadata_filters: dict,
table_alias: str | None = None,
) -> tuple[list[str], dict]:
columns = await self._table_columns(session, "documents")
prefix = f"{table_alias}." if table_alias else ""
filters_sql = []
params = {}
for key, val in metadata_filters.items():
if key in {"kb_id", "kb_doc_id", "user_id"} and key in columns:
filters_sql.append(f"{prefix}{key} = :filter_{key}")
else:
filters_sql.append(
f"json_extract({prefix}metadata, '$.{key}') = :filter_{key}"
)
params[f"filter_{key}"] = val
return filters_sql, params

async def ensure_fts_index(self) -> bool:
"""Ensure the FTS5 sparse index exists and matches the documents table."""
if not self.fts5_available:
Expand All @@ -517,22 +653,30 @@ async def ensure_fts_index(self) -> bool:

assert self.engine is not None, "Database connection is not initialized."

async with self.get_session() as session:
doc_count = await self._count_documents_in_session(session)
fts_count = await self._count_fts_rows(session)
if doc_count == fts_count:
self._fts_index_ready = True
async with self._fts_rebuild_lock:
if self._fts_index_ready:
return True

logger.info(
f"Rebuilding FTS5 sparse index for {self.db_path}: "
f"documents={doc_count}, fts_rows={fts_count}",
)
await self.rebuild_fts_index()
return self.fts5_available
async with self.get_session() as session:
doc_count = await self._count_documents_in_session(session)
fts_count = await self._count_fts_rows(session)
if doc_count == fts_count:
self._fts_index_ready = True
return True

logger.info(
f"Rebuilding FTS5 sparse index for {self.db_path}: "
f"documents={doc_count}, fts_rows={fts_count}",
)
await self._rebuild_fts_index_unlocked()
return self.fts5_available

async def rebuild_fts_index(self) -> None:
"""Rebuild the contentless FTS5 sparse index from documents."""
async with self._fts_rebuild_lock:
await self._rebuild_fts_index_unlocked()

async def _rebuild_fts_index_unlocked(self) -> None:
if not self.fts5_available:
return

Expand Down Expand Up @@ -577,7 +721,7 @@ async def search_sparse(
sparse retrieval implementation.
"""
if limit <= 0:
return []
return None
if not await self.ensure_fts_index():
return None

Expand Down
Loading
Loading