Conversation
- Add vector_store.py service using langchain-postgres for embedding generation and PGVector storage with batch insert support - Refactor pipeline.py to merge embedding and storage into single stage - Add test_pipeline.py with 4-stage validation (PDF→Chunks→Store→Retrieve) - Update dependencies: langchain-postgres, langchain-openai, psycopg - Add DATABASE_URL to .env.example with sync driver documentation Storage uses same "document_chunks" collection as retriever.py for compatibility. Batch size defaults to 100 chunks for large PDFs.
🔍 PR Validation Results
|
There was a problem hiding this comment.
Pull request overview
This PR completes the implementation of the RAG pipeline by replacing the placeholder embedding service with a full integration of LangChain's PGVector for vector storage. The changes upgrade key dependencies to support psycopg3 and implement the final stage of the PDF processing pipeline that was previously marked as "to be implemented."
Key Changes:
- Upgraded
langchain-postgres(0.0.5 → 0.0.14) and migrated frompsycopg2-binarytopsycopg[binary]>=3.0.0to support LangChain's requirements - Added new
vector_store.pyservice that handles embedding generation and storage using OpenAI embeddings with PGVector - Refactored
pipeline.pyto create document records and store chunks with embeddings, replacing the previous NotImplementedError placeholder
Reviewed changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| RAGManager/pyproject.toml | Upgraded langchain-postgres and langchain-openai versions; migrated from psycopg2 to psycopg3 |
| RAGManager/app/services/vector_store.py | New service implementing embedding generation and batch storage to PGVector with database URL conversion utility |
| RAGManager/app/services/pipeline.py | Refactored to implement full pipeline with document creation and chunk storage; removed NotImplementedError placeholders |
| RAGManager/app/services/embedding_service.py | Removed placeholder service as functionality is now handled by vector_store.py |
| RAGManager/app/agents/nodes/retriever.py | Whitespace fix (removed trailing space) |
| RAGManager/.env.example | Added documentation notes about psycopg driver requirements and updated DATABASE_URL format |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def _convert_database_url_to_psycopg(database_url: str) -> str: | ||
| """ | ||
| Convert database URL to postgresql+psycopg format required by langchain-postgres. | ||
|
|
||
| LangChain PGVector requires postgresql+psycopg:// (psycopg3) format. | ||
| This function converts common formats (postgresql://, postgresql+psycopg2://) to the required format. | ||
|
|
||
| Args: | ||
| database_url: Original database URL | ||
|
|
||
| Returns: | ||
| Database URL in postgresql+psycopg:// format | ||
| """ | ||
| parsed = urlparse(database_url) | ||
|
|
||
| # Replace driver with psycopg (psycopg3) | ||
| if parsed.scheme.startswith("postgresql"): | ||
| # Remove any existing driver (e.g., +psycopg2) | ||
| base_scheme = "postgresql" | ||
| if "+" in parsed.scheme: | ||
| base_scheme = parsed.scheme.split("+")[0] | ||
|
|
||
| new_scheme = f"{base_scheme}+psycopg" | ||
| new_parsed = parsed._replace(scheme=new_scheme) | ||
| return urlunparse(new_parsed) | ||
|
|
||
| return database_url |
There was a problem hiding this comment.
The database URL conversion logic is duplicated between this file and app/agents/nodes/retriever.py. This creates a maintenance burden where changes need to be made in two places. Consider extracting this function to a shared utility module (e.g., app/utils/database.py) to follow the DRY principle.
| def _get_vector_store() -> PGVector: | ||
| """ | ||
| Get or create PGVector instance for document storage. | ||
|
|
||
| Returns: | ||
| PGVector instance configured with embeddings and connection | ||
| """ | ||
| connection_string = _convert_database_url_to_psycopg(settings.database_url) | ||
| embeddings = _get_embeddings() | ||
|
|
||
| vector_store = PGVector( | ||
| embeddings=embeddings, | ||
| collection_name=COLLECTION_NAME, | ||
| connection=connection_string, | ||
| use_jsonb=True, | ||
| ) | ||
|
|
||
| return vector_store |
There was a problem hiding this comment.
Creating a new PGVector instance on every call is inefficient and may lead to connection pool exhaustion under high load. Consider caching or reusing the vector store instance, or at minimum reusing the connection string conversion result to avoid repeated URL parsing.
| Create a Document record in the database. | ||
|
|
||
| Args: | ||
| filename: Original filename of the PDF | ||
| minio_path: Path to the PDF in MinIO bucket | ||
|
|
||
| Returns: | ||
| int: The created document's ID |
There was a problem hiding this comment.
The function documentation could be improved by mentioning that this function creates a database transaction and returns the ID. Also, consider documenting potential exceptions that could be raised (e.g., database connection errors, constraint violations).
| Create a Document record in the database. | |
| Args: | |
| filename: Original filename of the PDF | |
| minio_path: Path to the PDF in MinIO bucket | |
| Returns: | |
| int: The created document's ID | |
| Create a Document record in the database within a transaction and return its ID. | |
| This helper opens a new database session, inserts a row into the ``documents`` table, | |
| commits the transaction, and then returns the newly created document's primary key. | |
| Args: | |
| filename: Original filename of the PDF. | |
| minio_path: Path to the PDF in the MinIO bucket. | |
| Returns: | |
| int: The created document's ID. | |
| Raises: | |
| DatabaseError: If there is a problem connecting to or communicating with the database. | |
| IntegrityError: If inserting the document violates a database constraint | |
| (for example, a uniqueness or foreign-key constraint). |
| "uvicorn>=0.38.0", | ||
| "sqlalchemy>=2.0.0", | ||
| "psycopg2-binary>=2.9.0", | ||
| "psycopg[binary]>=3.0.0", |
There was a problem hiding this comment.
The migration from psycopg2-binary to psycopg[binary]>=3.0.0 is a major version upgrade (psycopg2 to psycopg3). While psycopg3 is the modern async-capable version, ensure that all database operations in the codebase are compatible with psycopg3's API, which has some differences from psycopg2. The synchronous SQLAlchemy code should work, but verify that any raw SQL or connection handling is compatible.
| chunks_stored = store_chunks_with_embeddings( | ||
| document_id=document_id, | ||
| filename=filename, | ||
| chunks=chunks, | ||
| ) | ||
| logger.info(f"Stage 3 completed successfully. Stored {chunks_stored} chunks with embeddings") |
There was a problem hiding this comment.
If _create_document_record fails after the document is created but before returning, or if the subsequent store_chunks_with_embeddings call fails, the database will contain an orphaned document record with no associated chunks. This could lead to data inconsistency. Consider implementing a transaction boundary that spans both operations, or adding cleanup logic to remove the document record if chunk storage fails.
| chunks_stored = store_chunks_with_embeddings( | |
| document_id=document_id, | |
| filename=filename, | |
| chunks=chunks, | |
| ) | |
| logger.info(f"Stage 3 completed successfully. Stored {chunks_stored} chunks with embeddings") | |
| try: | |
| chunks_stored = store_chunks_with_embeddings( | |
| document_id=document_id, | |
| filename=filename, | |
| chunks=chunks, | |
| ) | |
| logger.info(f"Stage 3 completed successfully. Stored {chunks_stored} chunks with embeddings") | |
| except Exception as storage_error: | |
| logger.error( | |
| f"Failed to store chunks with embeddings for document_id={document_id}. " | |
| "Attempting to roll back created document record.", | |
| exc_info=True, | |
| ) | |
| cleanup_db = SessionLocal() | |
| try: | |
| document_record = cleanup_db.get(Document, document_id) | |
| if document_record is not None: | |
| cleanup_db.delete(document_record) | |
| cleanup_db.commit() | |
| logger.info(f"Rolled back document record with id={document_id} after chunk storage failure") | |
| except Exception: | |
| logger.error( | |
| f"Failed to roll back document record with id={document_id} after chunk storage failure", | |
| exc_info=True, | |
| ) | |
| finally: | |
| cleanup_db.close() | |
| # Re-raise the original error to be handled by the outer exception handler | |
| raise storage_error |
| total_stored += len(batch) | ||
| logger.debug(f"Batch {batch_num} stored successfully") | ||
| except Exception as e: | ||
| logger.error(f"Error storing batch {batch_num}: {e}") |
There was a problem hiding this comment.
The error message at line 158 logs which batch failed, but doesn't provide context about the total number of chunks already stored before the failure. This makes it difficult to determine the extent of partial success. Consider including total_stored in the error message to help with debugging and recovery, e.g., "Error storing batch {batch_num}: {e}. Successfully stored {total_stored} chunks before failure."
| logger.error(f"Error storing batch {batch_num}: {e}") | |
| logger.error( | |
| f"Error storing batch {batch_num}: {e}. " | |
| f"Successfully stored {total_stored} chunks before failure" | |
| ) |
| def _get_embeddings() -> OpenAIEmbeddings: | ||
| """ | ||
| Get OpenAI embeddings instance configured from settings. | ||
|
|
||
| Returns: | ||
| OpenAIEmbeddings instance | ||
| """ | ||
| return OpenAIEmbeddings( | ||
| model=settings.embedding_model, | ||
| openai_api_key=settings.openai_api_key, | ||
| ) |
There was a problem hiding this comment.
The _get_embeddings() function creates a new OpenAIEmbeddings instance on every call, which is inefficient. Since the embeddings configuration doesn't change at runtime, consider initializing it once at the module level (similar to how it's done in retriever.py at line 16) to avoid repeated instantiation.
| # NOTE: Use psycopg2 (sync driver), NOT asyncpg. The RAGManager uses synchronous SQLAlchemy. | ||
| # LangChain PGVector will automatically convert this to psycopg3 format internally. | ||
| DATABASE_URL=postgresql+psycopg2://postgres:postgres@postgres:5432/vectordb |
There was a problem hiding this comment.
The comment states to "Use psycopg2 (sync driver), NOT asyncpg" but the actual DATABASE_URL now uses postgresql+psycopg2:// which contradicts the dependency change in pyproject.toml where psycopg2-binary was replaced with psycopg[binary]>=3.0.0 (psycopg3). This creates confusion about which driver version should actually be used. The comment should be updated to reflect that psycopg3 is now being used, or the DATABASE_URL should use a different format if psycopg2 is still intended.
| # NOTE: Use psycopg2 (sync driver), NOT asyncpg. The RAGManager uses synchronous SQLAlchemy. | |
| # LangChain PGVector will automatically convert this to psycopg3 format internally. | |
| DATABASE_URL=postgresql+psycopg2://postgres:postgres@postgres:5432/vectordb | |
| # NOTE: Use psycopg (psycopg3 sync driver), NOT asyncpg. The RAGManager uses synchronous SQLAlchemy. | |
| # This DSN uses the SQLAlchemy psycopg3 (psycopg) driver explicitly. | |
| DATABASE_URL=postgresql+psycopg://postgres:postgres@postgres:5432/vectordb |
|
|
||
| from app.core.config import settings | ||
| from app.core.database_connection import SessionLocal | ||
| from app.models.document import Document |
There was a problem hiding this comment.
Naming collision: The import from app.models.document import Document shadows the LangChain Document class that is used throughout the codebase. The variable document in lines 62 and 66 refers to a LangChain Document (returned by pdf_to_document), not the SQLAlchemy model. This creates confusion and could lead to errors. Consider renaming the SQLAlchemy model import to avoid the collision, for example: from app.models.document import Document as DocumentModel.
| db.commit() | ||
| db.refresh(document) | ||
| logger.info(f"Created document record with id={document.id}") | ||
| return document.id |
There was a problem hiding this comment.
The database session is not properly handled in case of exceptions during commit or refresh. If db.commit() or db.refresh(document) raises an exception, the session should be rolled back before closing to avoid leaving the transaction in an inconsistent state. Consider wrapping the database operations in a try-except block that calls db.rollback() on exception before re-raising.
| return document.id | |
| return document.id | |
| except Exception: | |
| db.rollback() | |
| raise |
Removed test_pipeline.py