Conversation
- Add vector_store.py service using langchain-postgres for embedding generation and PGVector storage with batch insert support - Refactor pipeline.py to merge embedding and storage into single stage - Add test_pipeline.py with 4-stage validation (PDF→Chunks→Store→Retrieve) - Update dependencies: langchain-postgres, langchain-openai, psycopg - Add DATABASE_URL to .env.example with sync driver documentation Storage uses same "document_chunks" collection as retriever.py for compatibility. Batch size defaults to 100 chunks for large PDFs.
🔍 PR Validation Results
|
There was a problem hiding this comment.
Pull request overview
This PR introduces a complete vector storage solution by implementing a new vector_store.py service using LangChain's PGVector integration with OpenAI embeddings. The PR consolidates the previously separate embedding and storage stages into a single operation, removes the placeholder embedding_service.py, and provides a comprehensive end-to-end test script to validate the entire pipeline from PDF processing through vector retrieval.
Key changes:
- Implements
vector_store.pywith batch processing support for efficient handling of large PDFs - Refactors
pipeline.pyto merge embedding generation and database storage into a unified Stage 3 - Adds
test_pipeline.pyfor 4-stage validation (PDF→Documents→Chunks→Store→Retrieve)
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| RAGManager/app/services/vector_store.py | New service implementing PGVector storage with OpenAI embeddings, database URL conversion, and batch insertion functionality |
| RAGManager/app/services/pipeline.py | Refactored to remove placeholder embedding stage and integrate vector storage with database record creation |
| RAGManager/app/services/embedding_service.py | Removed placeholder file that is no longer needed |
| RAGManager/test_pipeline.py | New comprehensive test script with 4-stage pipeline validation and debug output dumps |
| RAGManager/pyproject.toml | Updated dependencies: langchain-postgres to 0.0.14, added langchain-openai, migrated from psycopg2-binary to psycopg[binary]>=3.0.0 |
| RAGManager/.env.example | Enhanced documentation for DATABASE_URL with driver compatibility notes |
| RAGManager/app/agents/nodes/retriever.py | Added missing newline at end of file |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # NOTE: Use psycopg2 (sync driver), NOT asyncpg. The RAGManager uses synchronous SQLAlchemy. | ||
| # LangChain PGVector will automatically convert this to psycopg3 format internally. | ||
| DATABASE_URL=postgresql+psycopg2://postgres:postgres@postgres:5432/vectordb |
There was a problem hiding this comment.
The comment states "Use psycopg2 (sync driver), NOT asyncpg" and the example uses "postgresql+psycopg2://". However, the pyproject.toml dependency was changed from "psycopg2-binary" to "psycopg[binary]>=3.0.0" (psycopg3). This creates a mismatch - the documentation suggests psycopg2 but the dependency is psycopg3. Consider updating the comment to clarify that while the DATABASE_URL should use the psycopg2 format, the vector_store will automatically convert it to psycopg3 format internally, or update the example to show "postgresql://" without a driver specification.
| # NOTE: Use psycopg2 (sync driver), NOT asyncpg. The RAGManager uses synchronous SQLAlchemy. | |
| # LangChain PGVector will automatically convert this to psycopg3 format internally. | |
| DATABASE_URL=postgresql+psycopg2://postgres:postgres@postgres:5432/vectordb | |
| # NOTE: Use a synchronous PostgreSQL URL (do NOT use async drivers like asyncpg). | |
| # LangChain PGVector / SQLAlchemy will handle the appropriate psycopg3 driver internally. | |
| DATABASE_URL=postgresql://postgres:postgres@postgres:5432/vectordb |
| logger.info(f" Pages extracted: {len(documents)}") | ||
| logger.info(f" Chunks created: {len(chunks)}") | ||
| logger.info(f" Chunks stored: {chunks_stored}") | ||
| logger.info(f" Chunks retrieved: {len(retrieved)}") |
There was a problem hiding this comment.
The variable 'retrieved' is referenced in the summary but may not be defined if Stage 4 encounters an exception before it's initialized. This would cause a NameError when trying to log the summary. Consider initializing 'retrieved = []' before Stage 4 begins.
| logger.info(f" Chunks stored: {chunks_stored}") | ||
| logger.info(f" Chunks retrieved: {len(retrieved)}") | ||
| logger.info(f" Document ID: {document_id}") | ||
| logger.info(f" Avg chunk size: {avg_size:.0f} characters") |
There was a problem hiding this comment.
The variable 'avg_size' is used in the summary but is only defined within the try block in Stage 2 (line 157). If Stage 2 fails or if execution skips Stage 2, this will cause a NameError when trying to log the summary. Initialize 'avg_size = 0' before Stage 2 to ensure it's always defined.
| logger.info(f" Chunks created: {len(chunks)}") | ||
| logger.info(f" Chunks stored: {chunks_stored}") | ||
| logger.info(f" Chunks retrieved: {len(retrieved)}") | ||
| logger.info(f" Document ID: {document_id}") |
There was a problem hiding this comment.
The variable 'document_id' is used in the summary but is only defined within the try block in Stage 3 (line 176). If Stage 3 fails or is skipped, this will cause a NameError when trying to log the summary. Initialize 'document_id = None' before Stage 3 to ensure it's always defined.
| def _convert_database_url_to_psycopg(database_url: str) -> str: | ||
| """ | ||
| Convert database URL to postgresql+psycopg format required by langchain-postgres. | ||
|
|
||
| LangChain PGVector requires postgresql+psycopg:// (psycopg3) format. | ||
| This function converts common formats (postgresql://, postgresql+psycopg2://) to the required format. | ||
|
|
||
| Args: | ||
| database_url: Original database URL | ||
|
|
||
| Returns: | ||
| Database URL in postgresql+psycopg:// format | ||
| """ | ||
| parsed = urlparse(database_url) | ||
|
|
||
| # Replace driver with psycopg (psycopg3) | ||
| if parsed.scheme.startswith("postgresql"): | ||
| # Remove any existing driver (e.g., +psycopg2) | ||
| base_scheme = "postgresql" | ||
| if "+" in parsed.scheme: | ||
| base_scheme = parsed.scheme.split("+")[0] | ||
|
|
||
| new_scheme = f"{base_scheme}+psycopg" | ||
| new_parsed = parsed._replace(scheme=new_scheme) | ||
| return urlunparse(new_parsed) | ||
|
|
||
| return database_url |
There was a problem hiding this comment.
This function '_convert_database_url_to_psycopg' is duplicated in 'app/agents/nodes/retriever.py' (lines 22-48). The logic is identical. Consider extracting this function to a shared utility module (e.g., 'app/core/database_utils.py' or 'app/utils/database.py') to avoid code duplication and ensure consistency across the codebase.
| def _get_embeddings() -> OpenAIEmbeddings: | ||
| """ | ||
| Get OpenAI embeddings instance configured from settings. | ||
|
|
||
| Returns: | ||
| OpenAIEmbeddings instance | ||
| """ | ||
| return OpenAIEmbeddings( | ||
| model=settings.embedding_model, | ||
| openai_api_key=settings.openai_api_key, | ||
| ) |
There was a problem hiding this comment.
The function '_get_embeddings' creates a new OpenAIEmbeddings instance on every call. In 'app/agents/nodes/retriever.py' (lines 16-19), the embeddings are initialized once as a module-level variable. Consider using a similar pattern here to avoid creating multiple embedding instances unnecessarily, which improves consistency and could prevent potential issues with resource management.
| def _get_vector_store() -> PGVector: | ||
| """ | ||
| Get or create PGVector instance for document storage. | ||
|
|
||
| Returns: | ||
| PGVector instance configured with embeddings and connection | ||
| """ | ||
| connection_string = _convert_database_url_to_psycopg(settings.database_url) | ||
| embeddings = _get_embeddings() | ||
|
|
||
| vector_store = PGVector( | ||
| embeddings=embeddings, | ||
| collection_name=COLLECTION_NAME, | ||
| connection=connection_string, | ||
| use_jsonb=True, | ||
| ) | ||
|
|
||
| return vector_store |
There was a problem hiding this comment.
The function '_get_vector_store' creates a new PGVector instance on every call, and the same pattern exists in 'app/agents/nodes/retriever.py' (lines 51-72). Creating new vector store instances repeatedly can be inefficient. Consider implementing a singleton pattern, caching the instance, or using a module-level variable to reuse the same PGVector instance across calls.
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # Collection name for the vector store | ||
| COLLECTION_NAME = "document_chunks" | ||
|
|
There was a problem hiding this comment.
The constant 'COLLECTION_NAME' has a different value ("document_chunks") than the same constant defined in 'app/agents/nodes/retriever.py' (line 63, also "document_chunks"). While they currently have the same value, having the same constant defined in multiple places can lead to inconsistency. Consider defining this constant in a shared location (e.g., 'app/core/config.py' or 'app/core/constants.py').
| logger = logging.getLogger(__name__) | |
| # Collection name for the vector store | |
| COLLECTION_NAME = "document_chunks" | |
| from app.agents.nodes.retriever import COLLECTION_NAME | |
| logger = logging.getLogger(__name__) |
| logger.info(f" PDF: {OBJECT_NAME}") | ||
| logger.info(f" Pages extracted: {len(documents)}") | ||
| logger.info(f" Chunks created: {len(chunks)}") | ||
| logger.info(f" Chunks stored: {chunks_stored}") |
There was a problem hiding this comment.
The variable 'chunks_stored' is used in the summary but is only defined within the try block in Stage 3 (line 179). If Stage 3 fails or is skipped, this will cause a NameError when trying to log the summary. Initialize 'chunks_stored = 0' before Stage 3 to ensure it's always defined.
| def _create_document_record(filename: str, minio_path: str) -> int: | ||
| """ | ||
| Create a Document record in the database. | ||
|
|
||
| Args: | ||
| filename: Original filename of the PDF | ||
| minio_path: Path to the PDF in MinIO bucket | ||
|
|
||
| Returns: | ||
| int: The created document's ID | ||
| """ | ||
| db = SessionLocal() | ||
| try: | ||
| document = Document(filename=filename, minio_path=minio_path) | ||
| db.add(document) | ||
| db.commit() | ||
| db.refresh(document) | ||
| logger.info(f"Created document record with id={document.id}") | ||
| return document.id | ||
| finally: | ||
| db.close() |
There was a problem hiding this comment.
The function '_create_document_record' is duplicated in 'test_pipeline.py' (lines 115-126) with nearly identical implementation. Consider making this function public (rename to 'create_document_record') and importing it in test_pipeline.py from this module to avoid code duplication and maintain a single source of truth for document creation logic.
🔍 PR Validation Results
|
generation and PGVector storage with batch insert support