Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion RAGManager/.env.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
DATABASE_URL=postgresql://postgres:postgres@postgres:5432/vectordb
# Database Configuration
# NOTE: Use psycopg2 (sync driver), NOT asyncpg. The RAGManager uses synchronous SQLAlchemy.
# LangChain PGVector will automatically convert this to psycopg3 format internally.
DATABASE_URL=postgresql+psycopg2://postgres:postgres@postgres:5432/vectordb
Comment on lines +2 to +4
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states to "Use psycopg2 (sync driver), NOT asyncpg" but the actual DATABASE_URL now uses postgresql+psycopg2:// which contradicts the dependency change in pyproject.toml where psycopg2-binary was replaced with psycopg[binary]>=3.0.0 (psycopg3). This creates confusion about which driver version should actually be used. The comment should be updated to reflect that psycopg3 is now being used, or the DATABASE_URL should use a different format if psycopg2 is still intended.

Suggested change
# NOTE: Use psycopg2 (sync driver), NOT asyncpg. The RAGManager uses synchronous SQLAlchemy.
# LangChain PGVector will automatically convert this to psycopg3 format internally.
DATABASE_URL=postgresql+psycopg2://postgres:postgres@postgres:5432/vectordb
# NOTE: Use psycopg (psycopg3 sync driver), NOT asyncpg. The RAGManager uses synchronous SQLAlchemy.
# This DSN uses the SQLAlchemy psycopg3 (psycopg) driver explicitly.
DATABASE_URL=postgresql+psycopg://postgres:postgres@postgres:5432/vectordb

Copilot uses AI. Check for mistakes.

# RabbitMQ Configuration
RABBITMQ_USER=guest
Expand Down
2 changes: 1 addition & 1 deletion RAGManager/app/agents/nodes/retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,4 @@ def retriever(state: AgentState) -> AgentState:
# Store the unique chunk contents in state
updated_state["relevant_chunks"] = unique_chunks

return updated_state
return updated_state
25 changes: 0 additions & 25 deletions RAGManager/app/services/embedding_service.py

This file was deleted.

79 changes: 51 additions & 28 deletions RAGManager/app/services/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,55 @@
import logging

from langchain_core.documents import Document

from app.core.config import settings
from app.core.database_connection import SessionLocal
from app.models.document import Document
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming collision: The import from app.models.document import Document shadows the LangChain Document class that is used throughout the codebase. The variable document in lines 62 and 66 refers to a LangChain Document (returned by pdf_to_document), not the SQLAlchemy model. This creates confusion and could lead to errors. Consider renaming the SQLAlchemy model import to avoid the collision, for example: from app.models.document import Document as DocumentModel.

Copilot uses AI. Check for mistakes.
from app.services.chunking_service import document_to_chunks
from app.services.embedding_service import chunks_to_embeddings
from app.services.pdf_processor import pdf_to_document
from app.services.vector_store import store_chunks_with_embeddings

logger = logging.getLogger(__name__)


def _create_document_record(filename: str, minio_path: str) -> int:
"""
Create a Document record in the database.

Args:
filename: Original filename of the PDF
minio_path: Path to the PDF in MinIO bucket

Returns:
int: The created document's ID
Comment on lines +15 to +22
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function documentation could be improved by mentioning that this function creates a database transaction and returns the ID. Also, consider documenting potential exceptions that could be raised (e.g., database connection errors, constraint violations).

Suggested change
Create a Document record in the database.
Args:
filename: Original filename of the PDF
minio_path: Path to the PDF in MinIO bucket
Returns:
int: The created document's ID
Create a Document record in the database within a transaction and return its ID.
This helper opens a new database session, inserts a row into the ``documents`` table,
commits the transaction, and then returns the newly created document's primary key.
Args:
filename: Original filename of the PDF.
minio_path: Path to the PDF in the MinIO bucket.
Returns:
int: The created document's ID.
Raises:
DatabaseError: If there is a problem connecting to or communicating with the database.
IntegrityError: If inserting the document violates a database constraint
(for example, a uniqueness or foreign-key constraint).

Copilot uses AI. Check for mistakes.
"""
db = SessionLocal()
try:
document = Document(filename=filename, minio_path=minio_path)
db.add(document)
db.commit()
db.refresh(document)
logger.info(f"Created document record with id={document.id}")
return document.id
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The database session is not properly handled in case of exceptions during commit or refresh. If db.commit() or db.refresh(document) raises an exception, the session should be rolled back before closing to avoid leaving the transaction in an inconsistent state. Consider wrapping the database operations in a try-except block that calls db.rollback() on exception before re-raising.

Suggested change
return document.id
return document.id
except Exception:
db.rollback()
raise

Copilot uses AI. Check for mistakes.
finally:
db.close()


def process_pdf_pipeline(object_name: str) -> int:
"""
Orchestrates the PDF processing pipeline.

This function coordinates the three-stage pipeline:
1. PDF to LangChain Document
2. Document to Chunks
3. Chunks to Embeddings
4. Store in database (to be implemented)
3. Embed and Store in database using PGVector

Args:
object_name: Path/name of the PDF object in the MinIO bucket

Returns:
int: document_id of the created document (mock value for now)
int: document_id of the created document

Raises:
NotImplementedError: If any of the pipeline stages are not yet implemented
Exception: If any of the pipeline stages fail
"""
logger.info(f"Starting PDF processing pipeline for object: {object_name}")

Expand All @@ -44,27 +66,28 @@ def process_pdf_pipeline(object_name: str) -> int:
chunks = document_to_chunks(document, settings.chunk_size, settings.chunk_overlap)
logger.info(f"Stage 2 completed successfully. Created {len(chunks)} chunks")

# Stage 3: Chunks to Embeddings
logger.info("Stage 3: Generating embeddings for chunks")
embeddings = chunks_to_embeddings(chunks)
logger.info(f"Stage 3 completed successfully. Generated {len(embeddings)} embeddings")

# Stage 4: Store in database (placeholder - not implemented yet)
logger.info("Stage 4: Storing chunks and embeddings in database")
# TODO: Implement database storage
# This will:
# 1. Create a Document record in the documents table
# 2. Create DocumentChunk records with embeddings in the document_chunks table
# 3. Return the document_id
raise NotImplementedError("Database storage will be implemented later")

except NotImplementedError as e:
logger.warning(f"Pipeline stage not implemented: {e}")
# Return a mock document_id for now
# In production, this should be replaced with actual database storage
mock_document_id = 1
logger.info(f"Pipeline completed with mock document_id: {mock_document_id}")
return mock_document_id
# Stage 3: Embed and Store in database
# First, create the document record to get the document_id
logger.info("Stage 3: Embedding and storing chunks in database")

# Extract filename from object_name (e.g., "folder/file.pdf" -> "file.pdf")
filename = object_name.split("/")[-1] if "/" in object_name else object_name

# Create document record in the documents table
document_id = _create_document_record(filename=filename, minio_path=object_name)

# Store chunks with embeddings using PGVector
# This generates embeddings via OpenAI and stores in the vector database
chunks_stored = store_chunks_with_embeddings(
document_id=document_id,
filename=filename,
chunks=chunks,
)
logger.info(f"Stage 3 completed successfully. Stored {chunks_stored} chunks with embeddings")
Comment on lines +81 to +86
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If _create_document_record fails after the document is created but before returning, or if the subsequent store_chunks_with_embeddings call fails, the database will contain an orphaned document record with no associated chunks. This could lead to data inconsistency. Consider implementing a transaction boundary that spans both operations, or adding cleanup logic to remove the document record if chunk storage fails.

Suggested change
chunks_stored = store_chunks_with_embeddings(
document_id=document_id,
filename=filename,
chunks=chunks,
)
logger.info(f"Stage 3 completed successfully. Stored {chunks_stored} chunks with embeddings")
try:
chunks_stored = store_chunks_with_embeddings(
document_id=document_id,
filename=filename,
chunks=chunks,
)
logger.info(f"Stage 3 completed successfully. Stored {chunks_stored} chunks with embeddings")
except Exception as storage_error:
logger.error(
f"Failed to store chunks with embeddings for document_id={document_id}. "
"Attempting to roll back created document record.",
exc_info=True,
)
cleanup_db = SessionLocal()
try:
document_record = cleanup_db.get(Document, document_id)
if document_record is not None:
cleanup_db.delete(document_record)
cleanup_db.commit()
logger.info(f"Rolled back document record with id={document_id} after chunk storage failure")
except Exception:
logger.error(
f"Failed to roll back document record with id={document_id} after chunk storage failure",
exc_info=True,
)
finally:
cleanup_db.close()
# Re-raise the original error to be handled by the outer exception handler
raise storage_error

Copilot uses AI. Check for mistakes.

logger.info(f"Pipeline completed successfully. Document ID: {document_id}")
return document_id

except Exception as e:
logger.error(f"Error in PDF processing pipeline: {e}")
raise
Expand Down
162 changes: 162 additions & 0 deletions RAGManager/app/services/vector_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""
Vector Store Service - Handles embedding generation and storage using LangChain PGVector.

This service provides functionality to:
1. Initialize PGVector connection with OpenAI embeddings
2. Store document chunks with their embeddings in batches
3. Convert database URLs to psycopg3 format required by langchain-postgres
"""

import logging
from urllib.parse import urlparse, urlunparse

from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_postgres import PGVector

from app.core.config import settings

logger = logging.getLogger(__name__)

# Collection name for the vector store
COLLECTION_NAME = "document_chunks"

# Batch size for inserting documents (to handle large PDFs efficiently)
DEFAULT_BATCH_SIZE = 100


def _convert_database_url_to_psycopg(database_url: str) -> str:
"""
Convert database URL to postgresql+psycopg format required by langchain-postgres.

LangChain PGVector requires postgresql+psycopg:// (psycopg3) format.
This function converts common formats (postgresql://, postgresql+psycopg2://) to the required format.

Args:
database_url: Original database URL

Returns:
Database URL in postgresql+psycopg:// format
"""
parsed = urlparse(database_url)

# Replace driver with psycopg (psycopg3)
if parsed.scheme.startswith("postgresql"):
# Remove any existing driver (e.g., +psycopg2)
base_scheme = "postgresql"
if "+" in parsed.scheme:
base_scheme = parsed.scheme.split("+")[0]

new_scheme = f"{base_scheme}+psycopg"
new_parsed = parsed._replace(scheme=new_scheme)
return urlunparse(new_parsed)

return database_url
Comment on lines +28 to +54
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The database URL conversion logic is duplicated between this file and app/agents/nodes/retriever.py. This creates a maintenance burden where changes need to be made in two places. Consider extracting this function to a shared utility module (e.g., app/utils/database.py) to follow the DRY principle.

Copilot uses AI. Check for mistakes.


def _get_embeddings() -> OpenAIEmbeddings:
"""
Get OpenAI embeddings instance configured from settings.

Returns:
OpenAIEmbeddings instance
"""
return OpenAIEmbeddings(
model=settings.embedding_model,
openai_api_key=settings.openai_api_key,
)
Comment on lines +57 to +67
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _get_embeddings() function creates a new OpenAIEmbeddings instance on every call, which is inefficient. Since the embeddings configuration doesn't change at runtime, consider initializing it once at the module level (similar to how it's done in retriever.py at line 16) to avoid repeated instantiation.

Copilot uses AI. Check for mistakes.


def _get_vector_store() -> PGVector:
"""
Get or create PGVector instance for document storage.

Returns:
PGVector instance configured with embeddings and connection
"""
connection_string = _convert_database_url_to_psycopg(settings.database_url)
embeddings = _get_embeddings()

vector_store = PGVector(
embeddings=embeddings,
collection_name=COLLECTION_NAME,
connection=connection_string,
use_jsonb=True,
)

return vector_store
Comment on lines +70 to +87
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a new PGVector instance on every call is inefficient and may lead to connection pool exhaustion under high load. Consider caching or reusing the vector store instance, or at minimum reusing the connection string conversion result to avoid repeated URL parsing.

Copilot uses AI. Check for mistakes.


def store_chunks_with_embeddings(
document_id: int,
filename: str,
chunks: list[Document],
batch_size: int = DEFAULT_BATCH_SIZE,
) -> int:
"""
Store document chunks with their embeddings in PGVector.

This function:
1. Prepares chunks with metadata (document_id, chunk_index, filename)
2. Uses LangChain PGVector to generate embeddings and store in batches
3. Returns the number of chunks stored

Args:
document_id: ID of the parent document in the documents table
filename: Original filename for metadata
chunks: List of LangChain Document chunks to embed and store
batch_size: Number of chunks to process per batch (default: 100)

Returns:
int: Number of chunks successfully stored

Raises:
Exception: If storage fails
"""
if not chunks:
logger.warning("No chunks provided for storage")
return 0

logger.info(f"Storing {len(chunks)} chunks for document_id={document_id}")

# Prepare documents with metadata for PGVector
prepared_docs = []
for idx, chunk in enumerate(chunks):
# Create a new document with enriched metadata
metadata = {
"document_id": document_id,
"chunk_index": idx,
"filename": filename,
# Preserve any existing metadata from chunking
**chunk.metadata,
}
prepared_docs.append(
Document(
page_content=chunk.page_content,
metadata=metadata,
)
)

# Get vector store instance
vector_store = _get_vector_store()

# Store documents in batches
total_stored = 0
for i in range(0, len(prepared_docs), batch_size):
batch = prepared_docs[i : i + batch_size]
batch_num = (i // batch_size) + 1
total_batches = (len(prepared_docs) + batch_size - 1) // batch_size

logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} chunks)")

try:
# PGVector.add_documents handles embedding generation internally
vector_store.add_documents(batch)
total_stored += len(batch)
logger.debug(f"Batch {batch_num} stored successfully")
except Exception as e:
logger.error(f"Error storing batch {batch_num}: {e}")
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message at line 158 logs which batch failed, but doesn't provide context about the total number of chunks already stored before the failure. This makes it difficult to determine the extent of partial success. Consider including total_stored in the error message to help with debugging and recovery, e.g., "Error storing batch {batch_num}: {e}. Successfully stored {total_stored} chunks before failure."

Suggested change
logger.error(f"Error storing batch {batch_num}: {e}")
logger.error(
f"Error storing batch {batch_num}: {e}. "
f"Successfully stored {total_stored} chunks before failure"
)

Copilot uses AI. Check for mistakes.
raise

logger.info(f"Successfully stored {total_stored} chunks for document_id={document_id}")
return total_stored
5 changes: 3 additions & 2 deletions RAGManager/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ dependencies = [
"langgraph>=1.0.4",
"langchain-core>=0.3.0",
"langchain-text-splitters>=0.3.0",
"langchain-postgres>=0.0.5",
"langchain-postgres>=0.0.14",
"langchain-openai>=0.3.0",
"typing-extensions>=4.15.0",
"uvicorn>=0.38.0",
"sqlalchemy>=2.0.0",
"psycopg2-binary>=2.9.0",
"psycopg[binary]>=3.0.0",
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The migration from psycopg2-binary to psycopg[binary]>=3.0.0 is a major version upgrade (psycopg2 to psycopg3). While psycopg3 is the modern async-capable version, ensure that all database operations in the codebase are compatible with psycopg3's API, which has some differences from psycopg2. The synchronous SQLAlchemy code should work, but verify that any raw SQL or connection handling is compatible.

Copilot uses AI. Check for mistakes.
"pgvector>=0.3.0",
"pydantic-settings>=2.0.0",
"guardrails-ai>=0.5.10",
Expand Down
Loading