Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion RAGManager/.env.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
DATABASE_URL=postgresql://postgres:postgres@postgres:5432/vectordb
# Database Configuration
# NOTE: Use psycopg2 (sync driver), NOT asyncpg. The RAGManager uses synchronous SQLAlchemy.
# LangChain PGVector will automatically convert this to psycopg3 format internally.
DATABASE_URL=postgresql+psycopg2://postgres:postgres@postgres:5432/vectordb
Comment on lines +2 to +4
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states "Use psycopg2 (sync driver), NOT asyncpg" and the example uses "postgresql+psycopg2://". However, the pyproject.toml dependency was changed from "psycopg2-binary" to "psycopg[binary]>=3.0.0" (psycopg3). This creates a mismatch - the documentation suggests psycopg2 but the dependency is psycopg3. Consider updating the comment to clarify that while the DATABASE_URL should use the psycopg2 format, the vector_store will automatically convert it to psycopg3 format internally, or update the example to show "postgresql://" without a driver specification.

Suggested change
# NOTE: Use psycopg2 (sync driver), NOT asyncpg. The RAGManager uses synchronous SQLAlchemy.
# LangChain PGVector will automatically convert this to psycopg3 format internally.
DATABASE_URL=postgresql+psycopg2://postgres:postgres@postgres:5432/vectordb
# NOTE: Use a synchronous PostgreSQL URL (do NOT use async drivers like asyncpg).
# LangChain PGVector / SQLAlchemy will handle the appropriate psycopg3 driver internally.
DATABASE_URL=postgresql://postgres:postgres@postgres:5432/vectordb

Copilot uses AI. Check for mistakes.

# RabbitMQ Configuration
RABBITMQ_USER=guest
Expand Down
2 changes: 1 addition & 1 deletion RAGManager/app/agents/nodes/retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,4 @@ def retriever(state: AgentState) -> AgentState:
# Store the unique chunk contents in state
updated_state["relevant_chunks"] = unique_chunks

return updated_state
return updated_state
25 changes: 0 additions & 25 deletions RAGManager/app/services/embedding_service.py

This file was deleted.

79 changes: 51 additions & 28 deletions RAGManager/app/services/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,55 @@
import logging

from langchain_core.documents import Document

from app.core.config import settings
from app.core.database_connection import SessionLocal
from app.models.document import Document
from app.services.chunking_service import document_to_chunks
from app.services.embedding_service import chunks_to_embeddings
from app.services.pdf_processor import pdf_to_document
from app.services.vector_store import store_chunks_with_embeddings

logger = logging.getLogger(__name__)


def _create_document_record(filename: str, minio_path: str) -> int:
"""
Create a Document record in the database.

Args:
filename: Original filename of the PDF
minio_path: Path to the PDF in MinIO bucket

Returns:
int: The created document's ID
"""
db = SessionLocal()
try:
document = Document(filename=filename, minio_path=minio_path)
db.add(document)
db.commit()
db.refresh(document)
logger.info(f"Created document record with id={document.id}")
return document.id
finally:
db.close()
Comment on lines +13 to +33
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function '_create_document_record' is duplicated in 'test_pipeline.py' (lines 115-126) with nearly identical implementation. Consider making this function public (rename to 'create_document_record') and importing it in test_pipeline.py from this module to avoid code duplication and maintain a single source of truth for document creation logic.

Copilot uses AI. Check for mistakes.


def process_pdf_pipeline(object_name: str) -> int:
"""
Orchestrates the PDF processing pipeline.

This function coordinates the three-stage pipeline:
1. PDF to LangChain Document
2. Document to Chunks
3. Chunks to Embeddings
4. Store in database (to be implemented)
3. Embed and Store in database using PGVector

Args:
object_name: Path/name of the PDF object in the MinIO bucket

Returns:
int: document_id of the created document (mock value for now)
int: document_id of the created document

Raises:
NotImplementedError: If any of the pipeline stages are not yet implemented
Exception: If any of the pipeline stages fail
"""
logger.info(f"Starting PDF processing pipeline for object: {object_name}")

Expand All @@ -44,27 +66,28 @@ def process_pdf_pipeline(object_name: str) -> int:
chunks = document_to_chunks(document, settings.chunk_size, settings.chunk_overlap)
logger.info(f"Stage 2 completed successfully. Created {len(chunks)} chunks")

# Stage 3: Chunks to Embeddings
logger.info("Stage 3: Generating embeddings for chunks")
embeddings = chunks_to_embeddings(chunks)
logger.info(f"Stage 3 completed successfully. Generated {len(embeddings)} embeddings")

# Stage 4: Store in database (placeholder - not implemented yet)
logger.info("Stage 4: Storing chunks and embeddings in database")
# TODO: Implement database storage
# This will:
# 1. Create a Document record in the documents table
# 2. Create DocumentChunk records with embeddings in the document_chunks table
# 3. Return the document_id
raise NotImplementedError("Database storage will be implemented later")

except NotImplementedError as e:
logger.warning(f"Pipeline stage not implemented: {e}")
# Return a mock document_id for now
# In production, this should be replaced with actual database storage
mock_document_id = 1
logger.info(f"Pipeline completed with mock document_id: {mock_document_id}")
return mock_document_id
# Stage 3: Embed and Store in database
# First, create the document record to get the document_id
logger.info("Stage 3: Embedding and storing chunks in database")

# Extract filename from object_name (e.g., "folder/file.pdf" -> "file.pdf")
filename = object_name.split("/")[-1] if "/" in object_name else object_name

# Create document record in the documents table
document_id = _create_document_record(filename=filename, minio_path=object_name)

# Store chunks with embeddings using PGVector
# This generates embeddings via OpenAI and stores in the vector database
chunks_stored = store_chunks_with_embeddings(
document_id=document_id,
filename=filename,
chunks=chunks,
)
logger.info(f"Stage 3 completed successfully. Stored {chunks_stored} chunks with embeddings")

logger.info(f"Pipeline completed successfully. Document ID: {document_id}")
return document_id

except Exception as e:
logger.error(f"Error in PDF processing pipeline: {e}")
raise
Expand Down
162 changes: 162 additions & 0 deletions RAGManager/app/services/vector_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""
Vector Store Service - Handles embedding generation and storage using LangChain PGVector.

This service provides functionality to:
1. Initialize PGVector connection with OpenAI embeddings
2. Store document chunks with their embeddings in batches
3. Convert database URLs to psycopg3 format required by langchain-postgres
"""

import logging
from urllib.parse import urlparse, urlunparse

from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_postgres import PGVector

from app.core.config import settings

logger = logging.getLogger(__name__)

# Collection name for the vector store
COLLECTION_NAME = "document_chunks"

Comment on lines +18 to +23
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constant 'COLLECTION_NAME' has a different value ("document_chunks") than the same constant defined in 'app/agents/nodes/retriever.py' (line 63, also "document_chunks"). While they currently have the same value, having the same constant defined in multiple places can lead to inconsistency. Consider defining this constant in a shared location (e.g., 'app/core/config.py' or 'app/core/constants.py').

Suggested change
logger = logging.getLogger(__name__)
# Collection name for the vector store
COLLECTION_NAME = "document_chunks"
from app.agents.nodes.retriever import COLLECTION_NAME
logger = logging.getLogger(__name__)

Copilot uses AI. Check for mistakes.
# Batch size for inserting documents (to handle large PDFs efficiently)
DEFAULT_BATCH_SIZE = 100


def _convert_database_url_to_psycopg(database_url: str) -> str:
"""
Convert database URL to postgresql+psycopg format required by langchain-postgres.

LangChain PGVector requires postgresql+psycopg:// (psycopg3) format.
This function converts common formats (postgresql://, postgresql+psycopg2://) to the required format.

Args:
database_url: Original database URL

Returns:
Database URL in postgresql+psycopg:// format
"""
parsed = urlparse(database_url)

# Replace driver with psycopg (psycopg3)
if parsed.scheme.startswith("postgresql"):
# Remove any existing driver (e.g., +psycopg2)
base_scheme = "postgresql"
if "+" in parsed.scheme:
base_scheme = parsed.scheme.split("+")[0]

new_scheme = f"{base_scheme}+psycopg"
new_parsed = parsed._replace(scheme=new_scheme)
return urlunparse(new_parsed)

return database_url
Comment on lines +28 to +54
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function '_convert_database_url_to_psycopg' is duplicated in 'app/agents/nodes/retriever.py' (lines 22-48). The logic is identical. Consider extracting this function to a shared utility module (e.g., 'app/core/database_utils.py' or 'app/utils/database.py') to avoid code duplication and ensure consistency across the codebase.

Copilot uses AI. Check for mistakes.


def _get_embeddings() -> OpenAIEmbeddings:
"""
Get OpenAI embeddings instance configured from settings.

Returns:
OpenAIEmbeddings instance
"""
return OpenAIEmbeddings(
model=settings.embedding_model,
openai_api_key=settings.openai_api_key,
)
Comment on lines +57 to +67
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function '_get_embeddings' creates a new OpenAIEmbeddings instance on every call. In 'app/agents/nodes/retriever.py' (lines 16-19), the embeddings are initialized once as a module-level variable. Consider using a similar pattern here to avoid creating multiple embedding instances unnecessarily, which improves consistency and could prevent potential issues with resource management.

Copilot uses AI. Check for mistakes.


def _get_vector_store() -> PGVector:
"""
Get or create PGVector instance for document storage.

Returns:
PGVector instance configured with embeddings and connection
"""
connection_string = _convert_database_url_to_psycopg(settings.database_url)
embeddings = _get_embeddings()

vector_store = PGVector(
embeddings=embeddings,
collection_name=COLLECTION_NAME,
connection=connection_string,
use_jsonb=True,
)

return vector_store
Comment on lines +70 to +87
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function '_get_vector_store' creates a new PGVector instance on every call, and the same pattern exists in 'app/agents/nodes/retriever.py' (lines 51-72). Creating new vector store instances repeatedly can be inefficient. Consider implementing a singleton pattern, caching the instance, or using a module-level variable to reuse the same PGVector instance across calls.

Copilot uses AI. Check for mistakes.


def store_chunks_with_embeddings(
document_id: int,
filename: str,
chunks: list[Document],
batch_size: int = DEFAULT_BATCH_SIZE,
) -> int:
"""
Store document chunks with their embeddings in PGVector.

This function:
1. Prepares chunks with metadata (document_id, chunk_index, filename)
2. Uses LangChain PGVector to generate embeddings and store in batches
3. Returns the number of chunks stored

Args:
document_id: ID of the parent document in the documents table
filename: Original filename for metadata
chunks: List of LangChain Document chunks to embed and store
batch_size: Number of chunks to process per batch (default: 100)

Returns:
int: Number of chunks successfully stored

Raises:
Exception: If storage fails
"""
if not chunks:
logger.warning("No chunks provided for storage")
return 0

logger.info(f"Storing {len(chunks)} chunks for document_id={document_id}")

# Prepare documents with metadata for PGVector
prepared_docs = []
for idx, chunk in enumerate(chunks):
# Create a new document with enriched metadata
metadata = {
"document_id": document_id,
"chunk_index": idx,
"filename": filename,
# Preserve any existing metadata from chunking
**chunk.metadata,
}
prepared_docs.append(
Document(
page_content=chunk.page_content,
metadata=metadata,
)
)

# Get vector store instance
vector_store = _get_vector_store()

# Store documents in batches
total_stored = 0
for i in range(0, len(prepared_docs), batch_size):
batch = prepared_docs[i : i + batch_size]
batch_num = (i // batch_size) + 1
total_batches = (len(prepared_docs) + batch_size - 1) // batch_size

logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} chunks)")

try:
# PGVector.add_documents handles embedding generation internally
vector_store.add_documents(batch)
total_stored += len(batch)
logger.debug(f"Batch {batch_num} stored successfully")
except Exception as e:
logger.error(f"Error storing batch {batch_num}: {e}")
raise

logger.info(f"Successfully stored {total_stored} chunks for document_id={document_id}")
return total_stored
5 changes: 3 additions & 2 deletions RAGManager/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ dependencies = [
"langgraph>=1.0.4",
"langchain-core>=0.3.0",
"langchain-text-splitters>=0.3.0",
"langchain-postgres>=0.0.5",
"langchain-postgres>=0.0.14",
"langchain-openai>=0.3.0",
"typing-extensions>=4.15.0",
"uvicorn>=0.38.0",
"sqlalchemy>=2.0.0",
"psycopg2-binary>=2.9.0",
"psycopg[binary]>=3.0.0",
"pgvector>=0.3.0",
"pydantic-settings>=2.0.0",
"guardrails-ai>=0.5.10",
Expand Down
Loading