Skip to content

Commit abfac9f

Browse files
committed
fix migration task scheduling and stabilize regression tests
Only create startup migration background tasks when migration is actually needed, and align tests with the new contract and stricter access filtering behavior. This also restores sparse fallback table resolution compatibility and hardens warning-log assertions to avoid flaky failures.
1 parent 5591b5b commit abfac9f

6 files changed

Lines changed: 56 additions & 7 deletions

File tree

src/xagent/core/tools/core/RAG_tools/retrieval/search_sparse.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
SearchWarning,
1111
SparseSearchResponse,
1212
)
13-
from ..LanceDB.model_tag_utils import to_model_tag
1413
from ..LanceDB.schema_manager import _safe_close_table
1514
from ..storage.contracts import FilterCondition, FilterExpression, FilterOperator
1615
from ..storage.factory import (
@@ -223,9 +222,10 @@ def _substring_fallback(
223222
if filters:
224223
query_filters.update(filters)
225224

225+
_table = None
226226
try:
227-
# Avoid opening an extra table handle here; iter_batches will open internally.
228-
table_name = f"embeddings_{to_model_tag(model_tag)}"
227+
# Resolve embeddings table name with legacy fallback support.
228+
_table, table_name = vector_store.open_embeddings_table(model_tag)
229229

230230
for batch in vector_store.iter_batches(
231231
table_name=table_name,
@@ -289,6 +289,8 @@ def _substring_fallback(
289289

290290
except Exception as exc:
291291
logger.error("Substring fallback failed: %s", exc)
292+
finally:
293+
_safe_close_table(_table)
292294

293295
return results
294296

src/xagent/web/services/rag_storage_migration_service.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,50 @@
1616
class RAGStorageMigrationService:
1717
"""Run storage compatibility checks and background migrations."""
1818

19-
async def start_background_migrations(self) -> asyncio.Task[None]:
20-
"""Create a non-blocking background task for storage migrations."""
19+
async def start_background_migrations(self) -> Optional[asyncio.Task[None]]:
20+
"""Create a non-blocking background task only when migration is needed."""
21+
if not self._should_start_background_task():
22+
logger.info("Skipping background LanceDB migration task (not needed)")
23+
return None
2124
return asyncio.create_task(self._run_migrations())
2225

26+
def _should_start_background_task(self) -> bool:
27+
"""Return whether startup should create a migration background task."""
28+
from ...core.tools.core.RAG_tools.LanceDB.schema_manager import (
29+
check_table_needs_migration,
30+
)
31+
from ...core.tools.core.RAG_tools.utils.lancedb_query_utils import (
32+
list_embeddings_table_names,
33+
)
34+
from ...providers.vector_store.lancedb import get_connection_from_env
35+
36+
auto_migrate = os.getenv("LANCEDB_AUTO_MIGRATE", "true").lower() == "true"
37+
if not auto_migrate:
38+
return False
39+
40+
conn = get_connection_from_env()
41+
tables_to_check = [
42+
"chunks",
43+
"documents",
44+
"parses",
45+
"ingestion_runs",
46+
"prompt_templates",
47+
]
48+
49+
if any(check_table_needs_migration(conn, table) for table in tables_to_check):
50+
return True
51+
52+
try:
53+
return any(
54+
check_table_needs_migration(conn, table_name)
55+
for table_name in list_embeddings_table_names(conn)
56+
)
57+
except Exception as e: # noqa: BLE001
58+
logger.warning(
59+
"Could not inspect embeddings tables for migration need: %s", e
60+
)
61+
return False
62+
2363
async def _run_migrations(self) -> None:
2464
"""Run migration checks and backfills in background."""
2565
auto_migrate = os.getenv("LANCEDB_AUTO_MIGRATE", "true").lower() == "true"

tests/core/tools/core/RAG_tools/generate/test_format_generation_prompt.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
)
99

1010
logger = logging.getLogger(__name__)
11+
PROMPT_LOGGER = "xagent.core.tools.core.RAG_tools.generate.format_generation_prompt"
1112

1213

1314
@pytest.fixture
@@ -78,7 +79,7 @@ def test_format_generation_prompt_empty_contexts_produces_warning_and_formats(
7879
caplog: pytest.LogCaptureFixture,
7980
) -> None:
8081
"""Test that empty formatted contexts produce a warning but still format."""
81-
with caplog.at_level(logging.WARNING):
82+
with caplog.at_level(logging.WARNING, logger=PROMPT_LOGGER):
8283
result = format_generation_prompt(
8384
prompt_template=sample_prompt_template_plain,
8485
formatted_contexts="",

tests/core/tools/core/RAG_tools/pipelines/test_document_search.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ def test_document_search_end_to_end(
190190
search_result = document_search.search_documents(
191191
collection=collection,
192192
query_text=query_text,
193+
user_id=1,
194+
is_admin=True,
193195
config=SearchConfig(
194196
search_type=SearchType.SPARSE,
195197
top_k=3,

tests/core/tools/core/RAG_tools/version_management/test_list_candidates.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77

88
import pytest
99

10-
from xagent.core.tools.core.RAG_tools.core.exceptions import DatabaseOperationError, VersionManagementError
10+
from xagent.core.tools.core.RAG_tools.core.exceptions import (
11+
VersionManagementError,
12+
)
1113
from xagent.core.tools.core.RAG_tools.core.schemas import StepType
1214
from xagent.core.tools.core.RAG_tools.version_management.list_candidates import (
1315
list_candidates,

tests/web/test_rag_storage_migration_service.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ async def _fake_run() -> None:
2020
called["run"] += 1
2121

2222
monkeypatch.setattr(service, "_run_migrations", _fake_run)
23+
monkeypatch.setattr(service, "_should_start_background_task", lambda: True)
2324

2425
task = await service.start_background_migrations()
26+
assert task is not None
2527
await task
2628

2729
assert isinstance(task, asyncio.Task)

0 commit comments

Comments
 (0)