diff --git a/RAGManager/.env.example b/RAGManager/.env.example index f61c8f4..895cbeb 100644 --- a/RAGManager/.env.example +++ b/RAGManager/.env.example @@ -1,4 +1,7 @@ -DATABASE_URL=postgresql://postgres:postgres@postgres:5432/vectordb +# Database Configuration +# NOTE: Use psycopg2 (sync driver), NOT asyncpg. The RAGManager uses synchronous SQLAlchemy. +# LangChain PGVector will automatically convert this to psycopg3 format internally. +DATABASE_URL=postgresql+psycopg2://postgres:postgres@postgres:5432/vectordb # RabbitMQ Configuration RABBITMQ_USER=guest diff --git a/RAGManager/app/agents/nodes/retriever.py b/RAGManager/app/agents/nodes/retriever.py index 0f2ec42..90bbdd0 100644 --- a/RAGManager/app/agents/nodes/retriever.py +++ b/RAGManager/app/agents/nodes/retriever.py @@ -163,4 +163,4 @@ def retriever(state: AgentState) -> AgentState: # Store the unique chunk contents in state updated_state["relevant_chunks"] = unique_chunks - return updated_state + return updated_state \ No newline at end of file diff --git a/RAGManager/app/services/embedding_service.py b/RAGManager/app/services/embedding_service.py deleted file mode 100644 index b70500d..0000000 --- a/RAGManager/app/services/embedding_service.py +++ /dev/null @@ -1,25 +0,0 @@ -from typing import List, Tuple - -from langchain_core.documents import Document - - -def chunks_to_embeddings(chunks: List[Document]) -> List[Tuple[str, List[float]]]: - """ - Placeholder function - to be implemented later. - - This function will: - 1. Generate embeddings for each chunk using OpenAI's embedding API - 2. Return a list of tuples containing chunk content and its embedding vector - - Args: - chunks: List of LangChain Document chunks to embed - - Returns: - List[Tuple[str, List[float]]]: List of tuples containing (content, embedding_vector) - where embedding_vector is a list of floats with dimension 1536 - - Raises: - NotImplementedError: This function is not yet implemented - """ - raise NotImplementedError("This function will be implemented later") - diff --git a/RAGManager/app/services/pipeline.py b/RAGManager/app/services/pipeline.py index d9cce0c..47fd346 100644 --- a/RAGManager/app/services/pipeline.py +++ b/RAGManager/app/services/pipeline.py @@ -1,15 +1,38 @@ import logging -from langchain_core.documents import Document - from app.core.config import settings +from app.core.database_connection import SessionLocal +from app.models.document import Document from app.services.chunking_service import document_to_chunks -from app.services.embedding_service import chunks_to_embeddings from app.services.pdf_processor import pdf_to_document +from app.services.vector_store import store_chunks_with_embeddings logger = logging.getLogger(__name__) +def _create_document_record(filename: str, minio_path: str) -> int: + """ + Create a Document record in the database. + + Args: + filename: Original filename of the PDF + minio_path: Path to the PDF in MinIO bucket + + Returns: + int: The created document's ID + """ + db = SessionLocal() + try: + document = Document(filename=filename, minio_path=minio_path) + db.add(document) + db.commit() + db.refresh(document) + logger.info(f"Created document record with id={document.id}") + return document.id + finally: + db.close() + + def process_pdf_pipeline(object_name: str) -> int: """ Orchestrates the PDF processing pipeline. @@ -17,17 +40,16 @@ def process_pdf_pipeline(object_name: str) -> int: This function coordinates the three-stage pipeline: 1. PDF to LangChain Document 2. Document to Chunks - 3. Chunks to Embeddings - 4. Store in database (to be implemented) + 3. Embed and Store in database using PGVector Args: object_name: Path/name of the PDF object in the MinIO bucket Returns: - int: document_id of the created document (mock value for now) + int: document_id of the created document Raises: - NotImplementedError: If any of the pipeline stages are not yet implemented + Exception: If any of the pipeline stages fail """ logger.info(f"Starting PDF processing pipeline for object: {object_name}") @@ -44,27 +66,28 @@ def process_pdf_pipeline(object_name: str) -> int: chunks = document_to_chunks(document, settings.chunk_size, settings.chunk_overlap) logger.info(f"Stage 2 completed successfully. Created {len(chunks)} chunks") - # Stage 3: Chunks to Embeddings - logger.info("Stage 3: Generating embeddings for chunks") - embeddings = chunks_to_embeddings(chunks) - logger.info(f"Stage 3 completed successfully. Generated {len(embeddings)} embeddings") - - # Stage 4: Store in database (placeholder - not implemented yet) - logger.info("Stage 4: Storing chunks and embeddings in database") - # TODO: Implement database storage - # This will: - # 1. Create a Document record in the documents table - # 2. Create DocumentChunk records with embeddings in the document_chunks table - # 3. Return the document_id - raise NotImplementedError("Database storage will be implemented later") - - except NotImplementedError as e: - logger.warning(f"Pipeline stage not implemented: {e}") - # Return a mock document_id for now - # In production, this should be replaced with actual database storage - mock_document_id = 1 - logger.info(f"Pipeline completed with mock document_id: {mock_document_id}") - return mock_document_id + # Stage 3: Embed and Store in database + # First, create the document record to get the document_id + logger.info("Stage 3: Embedding and storing chunks in database") + + # Extract filename from object_name (e.g., "folder/file.pdf" -> "file.pdf") + filename = object_name.split("/")[-1] if "/" in object_name else object_name + + # Create document record in the documents table + document_id = _create_document_record(filename=filename, minio_path=object_name) + + # Store chunks with embeddings using PGVector + # This generates embeddings via OpenAI and stores in the vector database + chunks_stored = store_chunks_with_embeddings( + document_id=document_id, + filename=filename, + chunks=chunks, + ) + logger.info(f"Stage 3 completed successfully. Stored {chunks_stored} chunks with embeddings") + + logger.info(f"Pipeline completed successfully. Document ID: {document_id}") + return document_id + except Exception as e: logger.error(f"Error in PDF processing pipeline: {e}") raise diff --git a/RAGManager/app/services/vector_store.py b/RAGManager/app/services/vector_store.py new file mode 100644 index 0000000..8f57c9e --- /dev/null +++ b/RAGManager/app/services/vector_store.py @@ -0,0 +1,162 @@ +""" +Vector Store Service - Handles embedding generation and storage using LangChain PGVector. + +This service provides functionality to: +1. Initialize PGVector connection with OpenAI embeddings +2. Store document chunks with their embeddings in batches +3. Convert database URLs to psycopg3 format required by langchain-postgres +""" + +import logging +from urllib.parse import urlparse, urlunparse + +from langchain_core.documents import Document +from langchain_openai import OpenAIEmbeddings +from langchain_postgres import PGVector + +from app.core.config import settings + +logger = logging.getLogger(__name__) + +# Collection name for the vector store +COLLECTION_NAME = "document_chunks" + +# Batch size for inserting documents (to handle large PDFs efficiently) +DEFAULT_BATCH_SIZE = 100 + + +def _convert_database_url_to_psycopg(database_url: str) -> str: + """ + Convert database URL to postgresql+psycopg format required by langchain-postgres. + + LangChain PGVector requires postgresql+psycopg:// (psycopg3) format. + This function converts common formats (postgresql://, postgresql+psycopg2://) to the required format. + + Args: + database_url: Original database URL + + Returns: + Database URL in postgresql+psycopg:// format + """ + parsed = urlparse(database_url) + + # Replace driver with psycopg (psycopg3) + if parsed.scheme.startswith("postgresql"): + # Remove any existing driver (e.g., +psycopg2) + base_scheme = "postgresql" + if "+" in parsed.scheme: + base_scheme = parsed.scheme.split("+")[0] + + new_scheme = f"{base_scheme}+psycopg" + new_parsed = parsed._replace(scheme=new_scheme) + return urlunparse(new_parsed) + + return database_url + + +def _get_embeddings() -> OpenAIEmbeddings: + """ + Get OpenAI embeddings instance configured from settings. + + Returns: + OpenAIEmbeddings instance + """ + return OpenAIEmbeddings( + model=settings.embedding_model, + openai_api_key=settings.openai_api_key, + ) + + +def _get_vector_store() -> PGVector: + """ + Get or create PGVector instance for document storage. + + Returns: + PGVector instance configured with embeddings and connection + """ + connection_string = _convert_database_url_to_psycopg(settings.database_url) + embeddings = _get_embeddings() + + vector_store = PGVector( + embeddings=embeddings, + collection_name=COLLECTION_NAME, + connection=connection_string, + use_jsonb=True, + ) + + return vector_store + + +def store_chunks_with_embeddings( + document_id: int, + filename: str, + chunks: list[Document], + batch_size: int = DEFAULT_BATCH_SIZE, +) -> int: + """ + Store document chunks with their embeddings in PGVector. + + This function: + 1. Prepares chunks with metadata (document_id, chunk_index, filename) + 2. Uses LangChain PGVector to generate embeddings and store in batches + 3. Returns the number of chunks stored + + Args: + document_id: ID of the parent document in the documents table + filename: Original filename for metadata + chunks: List of LangChain Document chunks to embed and store + batch_size: Number of chunks to process per batch (default: 100) + + Returns: + int: Number of chunks successfully stored + + Raises: + Exception: If storage fails + """ + if not chunks: + logger.warning("No chunks provided for storage") + return 0 + + logger.info(f"Storing {len(chunks)} chunks for document_id={document_id}") + + # Prepare documents with metadata for PGVector + prepared_docs = [] + for idx, chunk in enumerate(chunks): + # Create a new document with enriched metadata + metadata = { + "document_id": document_id, + "chunk_index": idx, + "filename": filename, + # Preserve any existing metadata from chunking + **chunk.metadata, + } + prepared_docs.append( + Document( + page_content=chunk.page_content, + metadata=metadata, + ) + ) + + # Get vector store instance + vector_store = _get_vector_store() + + # Store documents in batches + total_stored = 0 + for i in range(0, len(prepared_docs), batch_size): + batch = prepared_docs[i : i + batch_size] + batch_num = (i // batch_size) + 1 + total_batches = (len(prepared_docs) + batch_size - 1) // batch_size + + logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} chunks)") + + try: + # PGVector.add_documents handles embedding generation internally + vector_store.add_documents(batch) + total_stored += len(batch) + logger.debug(f"Batch {batch_num} stored successfully") + except Exception as e: + logger.error(f"Error storing batch {batch_num}: {e}") + raise + + logger.info(f"Successfully stored {total_stored} chunks for document_id={document_id}") + return total_stored diff --git a/RAGManager/pyproject.toml b/RAGManager/pyproject.toml index 4525963..45720b6 100644 --- a/RAGManager/pyproject.toml +++ b/RAGManager/pyproject.toml @@ -10,11 +10,12 @@ dependencies = [ "langgraph>=1.0.4", "langchain-core>=0.3.0", "langchain-text-splitters>=0.3.0", - "langchain-postgres>=0.0.5", + "langchain-postgres>=0.0.14", + "langchain-openai>=0.3.0", "typing-extensions>=4.15.0", "uvicorn>=0.38.0", "sqlalchemy>=2.0.0", - "psycopg2-binary>=2.9.0", + "psycopg[binary]>=3.0.0", "pgvector>=0.3.0", "pydantic-settings>=2.0.0", "guardrails-ai>=0.5.10",