Feat/kb phase1b rdb metadata migration#219
Draft
sqhyz55 wants to merge 11 commits into
Draft
Conversation
- introduce MetadataStore/VectorIndexStore/KBWriteCoordinator contracts and LanceDB-backed defaults - refactor RAG management and API paths to use storage abstractions instead of direct LanceDB access - add storage factory singleton reset hook and update tests to depend on storage layer (including collection manager and collections tests) - ensure phase 1A keeps doc_id semantics while remaining compatible with future file_id linkage
- Make LanceDBVectorIndexStore connection lazy to avoid early default-dir binding - Add clear_connection_cache() to reset provider-level cached connections - Isolate LANCEDB_DIR per test and reset KB storage singleton for clean state - Add assertion test to ensure default LanceDB directory is not modified
Use Hub ID as the single source of truth for metadata, embedding rows, and table naming, and add forward-migration compatibility plus test hardening for sparse search and isolated LanceDB migration tests.
Introduce storage abstraction contracts to decouple API and management layers from direct LanceDB dependencies. This enables future backend migration while maintaining backward compatibility. Storage Layer: - Extend MetadataStore with config operations (save/get_collection_config) - Extend VectorIndexStore with aggregate and delete operations (aggregate_collection_stats, aggregate_document_stats, delete_collection_data) - Implement all contracts in LanceDBMetadataStore and LanceDBVectorIndexStore - Add factory singleton (get_kb_write_coordinator) for coordinated access API Layer (api/kb.py): - Replace direct get_connection_from_env() calls with storage abstractions - Remove manual embeddings table updates in rename_collection_api - Use get_metadata_store() and get_vector_index_store() throughout Management Layer (management/collections.py): - Refactor list_collections() to use VectorIndexStore.aggregate_collection_stats() - Refactor delete_collection() to use VectorIndexStore.delete_collection_data() - Refactor cancel_collection() to use VectorIndexStore.list_document_records() - Refactor get_document_stats() to use VectorIndexStore.aggregate_document_stats() - Remove 324 lines of direct LanceDB operations Tests: - Add 90 lines of new storage layer tests - Update multitenancy tests to mock storage abstractions - Update kb_dir API tests for new mock patterns - All 767 RAG tests passing Phase 1A Constraints Met: - Interface decoupling only (no physical database split) - doc_id maintained as primary key - Backward compatibility via get_raw_connection() on all contracts - No changes to existing data schemas
…te migration
Add PostgreSQL-backed metadata storage with multi-user isolation, file ID linkage,
and document staging capabilities while maintaining LanceDB compatibility.
**Storage Layer:**
- Add PostgreSQLMetadataStore for control-plane metadata operations
- Add RDB ORM models (KBCollectionMetadata, KBCollectionShare,
KBDocumentStaging, KBCollectionConfig)
- Add CollectionPermissionChecker for owner/shared-user access control
- Update LanceDBMetadataStore schema with Phase 1B fields (owner_user_id,
external_file_id)
**Dual-Write Coordinator:**
- Add DualWriteCoordinator for safe LanceDB to PostgreSQL migration
- Support write modes: lancedb, postgresql, both
- Implement reconcile and backfill operations for data migration
**API Layer (9 new endpoints):**
- POST /collections/{collection}/share - Share collection (read-only)
- DELETE /collections/{collection}/share - Remove sharing
- GET /collections/shared-with-me - List shared collections
- POST /collections/{collection}/documents/register - Stage document
- POST /collections/{collection}/process - Queue for processing
- GET /collections/{collection}/documents/staged - List staged
- GET /collections/{collection}/documents/{doc_id}/status - Get status
- POST /collections/{collection}/documents/{doc_id}/retry - Retry failed
- POST /collections/clone - Clone collection metadata/config
**Testing:**
- 50 test cases covering PostgreSQL store, permissions, and dual-write
- Add development verification script (verify_pg_migration.py)
- All existing tests pass with backward compatibility
**Configuration:**
- RAG_METADATA_STORE_BACKEND: lancedb|postgresql (default: lancedb)
- RAG_DUAL_WRITE_ENABLED: true|false (default: false)
- RAG_READ_BACKEND: lancedb|postgresql (default: lancedb)
- RAG_WRITE_BACKEND: lancedb|postgresql|both (default: lancedb)
**schemas.py:** - Add Phase 1B fields to CollectionInfo (owner_user_id, external_file_id) - Add StageDocumentRequest/Response for document staging - Add 9 new API response models for sharing, staging, cloning - Update from_storage() to handle missing Phase 1B fields for backward compatibility **storage/factory.py:** - Add dual-write mode support via environment variables - Add get_dual_write_stats() for monitoring migration - Support DUAL_WRITE_ENABLED, READ_BACKEND, WRITE_BACKEND env vars **storage/lancedb_stores.py:** - Update ensure_collection_metadata_table() schema with Phase 1B fields - Add owner_user_id and external_file_id to LanceDB collection_metadata table **api/kb.py:** - Add 9 Phase 1B API endpoints for collection sharing, document staging, and collection cloning - Integrate CollectionPermissionChecker for authorization - All endpoints use PostgreSQL metadata store when available
This commit addresses 5 critical issues identified in code review: 1. Fix MetadataBackend enum integration in dual-write coordinator - Replace primary/secondary string naming with MetadataBackend enum - Update set_read_backend() to properly switch read backend - Fix reconcile/backfill to use enum constants 2. Fix fake async in PostgreSQLMetadataStore - Migrate from sync create_engine to create_async_engine - All DB operations now truly non-blocking with AsyncSession - Update get_raw_connection contract documentation 3. Fix read_backend configuration - Add MetadataBackend enum for type-safe backend selection - Ensure set_read_backend() actually switches the backend - Update DualWriteMetadataStore to support dynamic switching 4. Fix backfill_all_collections import path - Correct import from management.collections instead of LanceDB - Handle ListCollectionsResult return type properly 5. Fix CollectionPermissionChecker type annotations - Change from bare 'type' to Callable[[], Session] - Improve type safety and documentation Testing improvements: - Update all tests to use MetadataBackend enum - Fix mock configurations for async operations - Add comprehensive test coverage for dual-write scenarios - 50/50 tests passing All mypy errors resolved. Type safety improved with proper enum usage.
Phase 1: Storage Contract (Foundation) - Add get_session_factory() method to MetadataStore contract - Implement in LanceDBMetadataStore (returns None) - Implement in PostgreSQLMetadataStore (returns async_sessionmaker) - Implement in DualWriteMetadataStore (delegates to PG store) - Replace all 9 getattr(_session_factory) usages in kb.py with contract method Phase 2: Security (Error Messages) - Replace all 20 occurrences of detail=str(e) with generic messages - PermissionError returns: "You do not have permission..." - Generic errors return context-specific messages like "Failed to share collection..." - Exception details still logged server-side via exc_info=True Phase 3: HTTP Status Codes - Change not found returns from HTTP 200+status="error" to HTTP 404 - Change "PostgreSQL not available" from HTTP 200 to HTTP 503 - Fix retry_document to return HTTP 400 for invalid state (not failed) Phase 4: Data Consistency - Add UniqueConstraint(collection, doc_id) to KBDocumentStaging model - Prevents duplicate document inserts and multiple status records Phase 5: API Contract Validation - Add path/body collection validation to register_document endpoint - Add path/body collection validation to process_documents endpoint - Fix clone_collection to check existence before creating (returns 409 if exists) - Fix clone_collection to copy external_file_id field (Phase 1B) Tests: All 30 PostgreSQL tests pass, all 20 dual-write tests pass Lint: ruff, mypy, pre-commit all pass Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
ruff formatter removed unused Callable import that was added during get_session_factory() implementation but not actually used.
Fix critical issues where AsyncSession was being used synchronously,
which would cause failures in real PostgreSQL environments.
P0-1: Convert all Phase 1B endpoints to async DB operations
- Change from `session = session_factory()` to `async with session_factory() as session:`
- Change `session.execute(...)` to `await session.execute(...)`
- Change `session.commit()` to `await session.commit()`
- Remove `session.close()` (async with handles this automatically)
Affected endpoints:
- /collections/{collection}/share (POST/DELETE)
- /collections/shared-with-me (GET)
- /collections/{collection}/documents/register (POST)
- /collections/{collection}/process (POST)
- /collections/{collection}/documents/staged (GET)
- /collections/{collection}/documents/{doc_id}/status (GET)
- /collections/{collection}/documents/{doc_id}/retry (POST)
- /collections/clone (POST)
P0-2: Create AsyncCollectionPermissionChecker
- New class uses async def for all methods
- Uses `async with session_factory() as session:`
- Uses `await session.execute(...)`
- Maintains same permission logic as sync version
- Includes comprehensive test coverage (14 tests)
Testing:
- All 73 storage tests pass
- 14 new async permission tests added
- ruff and mypy checks pass
P1-3: Fix get_raw_connection() type signature - Change return type from DBConnection to Any - Reflects reality that LanceDB returns DBConnection while PostgreSQL returns AsyncEngine - Add detailed docstring explaining the type variance and usage patterns - Maintains backward compatibility while being type-honest P1-4: Optimize list_shared_collections to eliminate N+1 query Before: Query all shares (1 query), then query collection metadata for each share (N queries) After: Single JOIN query gets shares and collection metadata together Performance: O(1) database round-trip instead of O(N+1) Query: select(KBCollectionShare, KBCollectionMetadata).join(...) P2: Add pagination to /documents/staged endpoint - Add page query parameter (default: 1, min: 1) - Add page_size query parameter (default: 50, min: 1, max: 1000) - Calculate total_pages and offset automatically - Return total_count, page info in response message - Use func.count() on subquery for accurate total count Format: "Found X staged documents (page Y/Z, total: T)" Testing: - All 73 storage tests pass - ruff and mypy checks pass - Pagination prevents large payload issues
21 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.