diff --git a/RAGManager/app/core/config.py b/RAGManager/app/core/config.py index b96d119..264d591 100644 --- a/RAGManager/app/core/config.py +++ b/RAGManager/app/core/config.py @@ -13,7 +13,7 @@ class Settings(BaseSettings): minio_access_key: str minio_secret_key: str minio_bucket: str - minio_secure: bool = True + minio_use_ssl: bool = True # OpenAI Configuration openai_api_key: str @@ -21,6 +21,13 @@ class Settings(BaseSettings): # Database Configuration database_url: str + # RabbitMQ Configuration + rabbitmq_user: str + rabbitmq_password: str + rabbitmq_host: str = "localhost" + rabbitmq_port: int = 5672 + rabbitmq_queue_name: str = "document.process" + # Chunking Configuration chunk_size: int = 1000 chunk_overlap: int = 200 @@ -67,6 +74,15 @@ class Settings(BaseSettings): extra="ignore", ) + @property + def rabbitmq_url(self) -> str: + """Returns the RabbitMQ connection URL with URL-encoded credentials.""" + from urllib.parse import quote_plus + + encoded_user = quote_plus(self.rabbitmq_user) + encoded_password = quote_plus(self.rabbitmq_password) + return f"amqp://{encoded_user}:{encoded_password}@{self.rabbitmq_host}:{self.rabbitmq_port}/" + settings = Settings() diff --git a/RAGManager/app/core/rabbitmq.py b/RAGManager/app/core/rabbitmq.py new file mode 100644 index 0000000..53b5b26 --- /dev/null +++ b/RAGManager/app/core/rabbitmq.py @@ -0,0 +1,99 @@ +import json +import logging +from typing import Callable, Optional + +import pika +from pika.exceptions import AMQPConnectionError + +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +class RabbitMQConnection: + """Handles connection and operations with RabbitMQ""" + + def __init__(self): + self.connection: Optional[pika.BlockingConnection] = None + self.channel: Optional[pika.channel.Channel] = None + + def connect(self): + """Establishes connection with RabbitMQ""" + try: + url = settings.rabbitmq_url + logger.info(f"Connecting to RabbitMQ at {settings.rabbitmq_host}:{settings.rabbitmq_port}") + logger.debug( + f"RabbitMQ URL: amqp://{settings.rabbitmq_user}:***@{settings.rabbitmq_host}:{settings.rabbitmq_port}/" + ) + + self.connection = pika.BlockingConnection(pika.URLParameters(url)) + self.channel = self.connection.channel() + logger.info("Connected to RabbitMQ") + except AMQPConnectionError as e: + logger.error(f"Failed to connect to RabbitMQ: {e}") + logger.error(f"Configured host: {settings.rabbitmq_host}") + logger.error(f"Configured port: {settings.rabbitmq_port}") + raise + except Exception as e: + logger.error(f"Unexpected error connecting to RabbitMQ: {e}") + logger.error(f"Error type: {type(e).__name__}") + raise + + def close(self): + """Closes the connection""" + if self.channel and not self.channel.is_closed: + self.channel.close() + logger.info("RabbitMQ channel closed") + if self.connection and not self.connection.is_closed: + self.connection.close() + logger.info("RabbitMQ connection closed") + + def declare_queue(self, queue_name: str, durable: bool = True): + """Declares a queue""" + if not self.channel: + self.connect() + self.channel.queue_declare(queue=queue_name, durable=durable) + logger.info(f"Queue '{queue_name}' declared") + + def consume_messages(self, queue_name: str, callback: Callable): + """ + Start consuming messages from the queue. + + Args: + queue_name: Name of the queue to consume from + callback: Callback function to process messages + """ + if not self.channel: + self.connect() + + # Declare queue (idempotent operation) + self.declare_queue(queue_name) + + # Set QoS to process one message at a time + self.channel.basic_qos(prefetch_count=1) + + # Start consuming + self.channel.basic_consume( + queue=queue_name, + on_message_callback=callback, + auto_ack=False, # Manual acknowledgment + ) + + logger.info(f"Started consuming messages from queue '{queue_name}'") + logger.info("Waiting for messages. To exit press CTRL+C") + + try: + self.channel.start_consuming() + except KeyboardInterrupt: + logger.info("Interrupted by user") + self.channel.stop_consuming() + except Exception as e: + logger.error(f"Error while consuming messages: {e}") + raise + finally: + self.close() + + +# Global instance +rabbitmq = RabbitMQConnection() + diff --git a/RAGManager/app/services/minio_client.py b/RAGManager/app/services/minio_client.py index 2cfa106..17843ed 100644 --- a/RAGManager/app/services/minio_client.py +++ b/RAGManager/app/services/minio_client.py @@ -1,6 +1,7 @@ # MinIO client configuration and utilities. import logging +from urllib.parse import urlparse import certifi import urllib3 @@ -14,6 +15,19 @@ def get_minio_client() -> Minio: """Create a MinIO client with proper timeout and retry configuration.""" + # Parse endpoint to extract host and port (urlparse strips the scheme automatically) + parsed = urlparse(settings.minio_endpoint) + endpoint = parsed.netloc or parsed.path + + # Validate that the endpoint is not empty + if not endpoint or endpoint.strip() == "": + error_msg = ( + f"Invalid MinIO endpoint configuration: '{settings.minio_endpoint}'. " + "Endpoint must be a valid host or host:port (e.g., 'localhost:9000')" + ) + logger.error(error_msg) + raise ValueError(error_msg) + # Configure timeout: 10s connect, 30s read timeout = UrllibTimeout(connect=10, read=30) @@ -34,10 +48,10 @@ def get_minio_client() -> Minio: ) return Minio( - endpoint=settings.minio_endpoint, + endpoint=endpoint, access_key=settings.minio_access_key, secret_key=settings.minio_secret_key, - secure=settings.minio_secure, + secure=settings.minio_use_ssl, http_client=http_client, ) diff --git a/RAGManager/app/workers/__init__.py b/RAGManager/app/workers/__init__.py new file mode 100644 index 0000000..cd00fa7 --- /dev/null +++ b/RAGManager/app/workers/__init__.py @@ -0,0 +1,2 @@ +"""Workers package for background processing tasks.""" + diff --git a/RAGManager/app/workers/pdf_processor_consumer.py b/RAGManager/app/workers/pdf_processor_consumer.py new file mode 100644 index 0000000..01dcc4d --- /dev/null +++ b/RAGManager/app/workers/pdf_processor_consumer.py @@ -0,0 +1,121 @@ +"""RabbitMQ consumer for processing PDFs from MinIO events.""" + +import json +import logging +from urllib.parse import unquote + +from app.core.config import settings +from app.core.rabbitmq import RabbitMQConnection +from app.services.pipeline import process_pdf_pipeline + +logger = logging.getLogger(__name__) + + +def extract_pdf_path(message_body: dict) -> str: + """ + Extract the PDF path from MinIO event message. + + Args: + message_body: Parsed JSON message from MinIO + + Returns: + str: Decoded object path (e.g., "rag-docs/file.pdf") + + Raises: + ValueError: If message structure is invalid + """ + records = message_body.get("Records", []) + if not records: + raise ValueError("No Records found in message") + + # Extract key from Records[0].s3.object.key + try: + object_key = records[0]["s3"]["object"]["key"] + except (KeyError, IndexError) as e: + raise ValueError(f"Invalid message structure: {e}") from e + + # URL decode: rag-docs%2Farchivo.pdf -> rag-docs/archivo.pdf + decoded_path = unquote(object_key) + + return decoded_path + + +def message_callback(ch, method, properties, body): + """ + Callback function to process RabbitMQ messages. + + Args: + ch: Channel + method: Method + properties: Properties + body: Message body (bytes) + """ + try: + # Parse JSON message + message = json.loads(body) + logger.info(f"Received message from RabbitMQ") + logger.debug(f"Message content: {message}") + + # Extract PDF path + pdf_path = extract_pdf_path(message) + logger.info(f"Extracted PDF path: {pdf_path}") + + # Only process PDFs + if not pdf_path.lower().endswith('.pdf'): + logger.info(f"Skipping non-PDF file: {pdf_path}") + ch.basic_ack(delivery_tag=method.delivery_tag) + return + + # Call the existing pipeline + logger.info(f"Starting PDF processing pipeline for: {pdf_path}") + document_id = process_pdf_pipeline(pdf_path) + logger.info(f"PDF processed successfully: {pdf_path} -> Document ID: {document_id}") + + # Acknowledge the message + ch.basic_ack(delivery_tag=method.delivery_tag) + logger.info(f"Message acknowledged for: {pdf_path}") + + except json.JSONDecodeError as e: + logger.error(f"Failed to parse JSON message: {e}") + # NACK without requeue - malformed messages won't be fixed by retrying + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + except ValueError as e: + logger.error(f"Invalid message structure: {e}") + # NACK without requeue - invalid structure won't be fixed by retrying + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + except Exception as e: + logger.error(f"Error processing message: {e}", exc_info=True) + # NACK without requeue to avoid infinite loops + # In production, consider implementing a dead-letter queue + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + + +def start_consumer(): + """ + Start the RabbitMQ consumer to process PDF files. + + This function runs in a blocking loop and should be executed + in a separate thread or process. + """ + logger.info("Starting PDF processor consumer") + + try: + # Create RabbitMQ connection + rabbitmq = RabbitMQConnection() + rabbitmq.connect() + + # Start consuming messages + queue_name = settings.rabbitmq_queue_name + logger.info(f"Consuming messages from queue: {queue_name}") + + rabbitmq.consume_messages( + queue_name=queue_name, + callback=message_callback + ) + + except KeyboardInterrupt: + logger.info("Consumer interrupted by user") + except Exception as e: + logger.error(f"Fatal error in consumer: {e}", exc_info=True) + raise + diff --git a/RAGManager/main.py b/RAGManager/main.py index dd6914a..100a13d 100644 --- a/RAGManager/main.py +++ b/RAGManager/main.py @@ -1,9 +1,12 @@ import logging +import threading + from fastapi import FastAPI from app.api.routes import router as api_router from app.api.routes.base import router as base_router from app.core.database_connection import init_db +from app.workers.pdf_processor_consumer import start_consumer # Configure logging logging.basicConfig( @@ -21,10 +24,28 @@ @app.on_event("startup") async def startup_event(): - """Initialize database on startup.""" + """Initialize database and start RabbitMQ consumer on startup.""" try: init_db() logging.info("Database initialized successfully") + + # Start RabbitMQ consumer in a separate daemon thread + consumer_thread = threading.Thread(target=start_consumer, daemon=True) + consumer_thread.start() + logging.info("RabbitMQ consumer started successfully") + except Exception as e: - logging.error(f"Failed to initialize database: {e}") - raise \ No newline at end of file + logging.error(f"Failed to initialize: {e}") + raise + + +@app.get("/") +async def root(): + return {"message": "Hello World"} + + +@app.get("/health") +def health_check(): + return {"message": "200 corriendo..."} + + diff --git a/RAGManager/pyproject.toml b/RAGManager/pyproject.toml index 4525963..aa77850 100644 --- a/RAGManager/pyproject.toml +++ b/RAGManager/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "minio>=7.2.20", "presidio-analyzer>=2.2.360", "presidio-anonymizer>=2.2.360", + "pika>=1.3.0", ] [project.optional-dependencies]