-
Notifications
You must be signed in to change notification settings - Fork 0
Feat/rag rmq integration #55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1ab9ae1
1cadeef
f25b79e
86c2457
b397fdf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,99 @@ | ||||||||||||||||||||||||||
| import json | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
| import json |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RabbitMQ connection credentials are logged in plain debug output which could expose sensitive information if debug logging is enabled in production. The password should be redacted in all log messages. Consider using '***' for all credential fields in the debug output, not just the password in line 26.
| f"RabbitMQ URL: amqp://{settings.rabbitmq_user}:***@{settings.rabbitmq_host}:{settings.rabbitmq_port}/" | |
| f"RabbitMQ URL: amqp://***:***@{settings.rabbitmq_host}:{settings.rabbitmq_port}/" |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RabbitMQ queue is declared with durable=True by default, but there's no corresponding message persistence configuration. Messages published to this queue should have delivery_mode=2 to make them persistent, otherwise they could be lost if RabbitMQ crashes even though the queue is durable. Consider documenting this requirement or configuring persistence at the publisher level.
| """Declares a queue""" | |
| """ | |
| Declares a queue. | |
| Note: | |
| This method declares the queue as durable by default so that the queue | |
| itself survives RabbitMQ restarts. Durability of the queue does not | |
| automatically make messages persistent. Any publisher sending messages | |
| to this queue that must survive broker restarts should set | |
| `delivery_mode=2` (for example via `pika.BasicProperties`) when | |
| publishing. | |
| """ |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The QoS setting prefetch_count=1 will process messages sequentially one at a time. While this ensures message ordering and prevents overwhelming the system, it may be inefficient for the PDF processing workload. Consider whether parallel processing of multiple PDFs would be beneficial, and if so, increase the prefetch_count or run multiple consumer instances.
| # Set QoS to process one message at a time | |
| self.channel.basic_qos(prefetch_count=1) | |
| # Set QoS prefetch count (default to processing one message at a time) | |
| prefetch_count = getattr(settings, "rabbitmq_prefetch_count", 1) | |
| logger.info(f"Setting RabbitMQ QoS prefetch_count={prefetch_count}") | |
| self.channel.basic_qos(prefetch_count=prefetch_count) |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The global RabbitMQConnection instance is created at module level but the start_consumer function creates a new instance. This could lead to confusion about connection management and makes it unclear which instance should be used. Consider either using the global instance or removing it if not needed.
| # Global instance | |
| rabbitmq = RabbitMQConnection() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| """Workers package for background processing tasks.""" | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| """RabbitMQ consumer for processing PDFs from MinIO events.""" | ||
|
|
||
| import json | ||
| import logging | ||
| from urllib.parse import unquote | ||
|
|
||
| from app.core.config import settings | ||
| from app.core.rabbitmq import RabbitMQConnection | ||
| from app.services.pipeline import process_pdf_pipeline | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def extract_pdf_path(message_body: dict) -> str: | ||
| """ | ||
| Extract the PDF path from MinIO event message. | ||
|
|
||
| Args: | ||
| message_body: Parsed JSON message from MinIO | ||
|
|
||
| Returns: | ||
| str: Decoded object path (e.g., "rag-docs/file.pdf") | ||
|
|
||
| Raises: | ||
| ValueError: If message structure is invalid | ||
| """ | ||
| records = message_body.get("Records", []) | ||
| if not records: | ||
| raise ValueError("No Records found in message") | ||
|
|
||
| # Extract key from Records[0].s3.object.key | ||
| try: | ||
| object_key = records[0]["s3"]["object"]["key"] | ||
| except (KeyError, IndexError) as e: | ||
| raise ValueError(f"Invalid message structure: {e}") from e | ||
|
|
||
| # URL decode: rag-docs%2Farchivo.pdf -> rag-docs/archivo.pdf | ||
| decoded_path = unquote(object_key) | ||
|
|
||
| return decoded_path | ||
|
|
||
|
|
||
| def message_callback(ch, method, properties, body): | ||
| """ | ||
| Callback function to process RabbitMQ messages. | ||
|
|
||
| Args: | ||
| ch: Channel | ||
| method: Method | ||
| properties: Properties | ||
| body: Message body (bytes) | ||
|
Comment on lines
+43
to
+51
|
||
| """ | ||
| try: | ||
| # Parse JSON message | ||
| message = json.loads(body) | ||
| logger.info(f"Received message from RabbitMQ") | ||
| logger.debug(f"Message content: {message}") | ||
|
|
||
| # Extract PDF path | ||
| pdf_path = extract_pdf_path(message) | ||
| logger.info(f"Extracted PDF path: {pdf_path}") | ||
|
|
||
| # Only process PDFs | ||
| if not pdf_path.lower().endswith('.pdf'): | ||
| logger.info(f"Skipping non-PDF file: {pdf_path}") | ||
| ch.basic_ack(delivery_tag=method.delivery_tag) | ||
| return | ||
|
|
||
| # Call the existing pipeline | ||
| logger.info(f"Starting PDF processing pipeline for: {pdf_path}") | ||
| document_id = process_pdf_pipeline(pdf_path) | ||
| logger.info(f"PDF processed successfully: {pdf_path} -> Document ID: {document_id}") | ||
|
|
||
| # Acknowledge the message | ||
| ch.basic_ack(delivery_tag=method.delivery_tag) | ||
| logger.info(f"Message acknowledged for: {pdf_path}") | ||
|
Comment on lines
+71
to
+76
|
||
|
|
||
| except json.JSONDecodeError as e: | ||
| logger.error(f"Failed to parse JSON message: {e}") | ||
| # NACK without requeue - malformed messages won't be fixed by retrying | ||
| ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) | ||
| except ValueError as e: | ||
| logger.error(f"Invalid message structure: {e}") | ||
| # NACK without requeue - invalid structure won't be fixed by retrying | ||
| ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) | ||
| except Exception as e: | ||
| logger.error(f"Error processing message: {e}", exc_info=True) | ||
| # NACK without requeue to avoid infinite loops | ||
| # In production, consider implementing a dead-letter queue | ||
| ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) | ||
|
Comment on lines
+86
to
+90
|
||
|
|
||
|
|
||
| def start_consumer(): | ||
| """ | ||
| Start the RabbitMQ consumer to process PDF files. | ||
|
|
||
| This function runs in a blocking loop and should be executed | ||
| in a separate thread or process. | ||
| """ | ||
| logger.info("Starting PDF processor consumer") | ||
|
|
||
| try: | ||
| # Create RabbitMQ connection | ||
| rabbitmq = RabbitMQConnection() | ||
| rabbitmq.connect() | ||
|
|
||
| # Start consuming messages | ||
| queue_name = settings.rabbitmq_queue_name | ||
| logger.info(f"Consuming messages from queue: {queue_name}") | ||
|
|
||
| rabbitmq.consume_messages( | ||
| queue_name=queue_name, | ||
| callback=message_callback | ||
| ) | ||
|
|
||
| except KeyboardInterrupt: | ||
| logger.info("Consumer interrupted by user") | ||
| except Exception as e: | ||
| logger.error(f"Fatal error in consumer: {e}", exc_info=True) | ||
| raise | ||
|
Comment on lines
+100
to
+120
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,9 +1,12 @@ | ||||||
| import logging | ||||||
| import threading | ||||||
|
|
||||||
| from fastapi import FastAPI | ||||||
|
|
||||||
| from app.api.routes import router as api_router | ||||||
| from app.api.routes.base import router as base_router | ||||||
| from app.core.database_connection import init_db | ||||||
| from app.workers.pdf_processor_consumer import start_consumer | ||||||
|
|
||||||
| # Configure logging | ||||||
| logging.basicConfig( | ||||||
|
|
@@ -21,10 +24,28 @@ | |||||
|
|
||||||
| @app.on_event("startup") | ||||||
| async def startup_event(): | ||||||
| """Initialize database on startup.""" | ||||||
| """Initialize database and start RabbitMQ consumer on startup.""" | ||||||
| try: | ||||||
| init_db() | ||||||
| logging.info("Database initialized successfully") | ||||||
|
Comment on lines
26
to
30
|
||||||
|
|
||||||
| # Start RabbitMQ consumer in a separate daemon thread | ||||||
| consumer_thread = threading.Thread(target=start_consumer, daemon=True) | ||||||
| consumer_thread.start() | ||||||
|
Comment on lines
+33
to
+34
|
||||||
| logging.info("RabbitMQ consumer started successfully") | ||||||
|
Comment on lines
+33
to
+35
|
||||||
|
|
||||||
| except Exception as e: | ||||||
| logging.error(f"Failed to initialize database: {e}") | ||||||
| raise | ||||||
| logging.error(f"Failed to initialize: {e}") | ||||||
| raise | ||||||
|
|
||||||
|
|
||||||
| @app.get("/") | ||||||
| async def root(): | ||||||
| return {"message": "Hello World"} | ||||||
|
|
||||||
|
|
||||||
| @app.get("/health") | ||||||
| def health_check(): | ||||||
| return {"message": "200 corriendo..."} | ||||||
|
||||||
| return {"message": "200 corriendo..."} | |
| return {"message": "200 running..."} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rabbitmq_url property imports urllib.parse.quote_plus inside the method. This import should be moved to the module level for better performance, as the import will be executed every time the property is accessed. Consider moving it to the top of the file with other imports.