Conversation
🔍 PR Validation Results
|
There was a problem hiding this comment.
Pull request overview
This PR integrates RabbitMQ message queue functionality into the RAG Manager service to enable asynchronous PDF processing. The integration allows the service to consume MinIO events from a RabbitMQ queue and automatically process uploaded PDF files through the existing pipeline without requiring direct API calls.
Key Changes:
- Added RabbitMQ consumer that processes MinIO S3 bucket events for automatic PDF processing
- Integrated the consumer as a daemon thread that starts on application startup
- Added configuration management for RabbitMQ connection parameters with URL encoding support
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
| RAGManager/pyproject.toml | Added pika>=1.3.0 dependency for RabbitMQ client support |
| RAGManager/main.py | Integrated RabbitMQ consumer startup in daemon thread; added health check endpoints |
| RAGManager/app/workers/pdf_processor_consumer.py | New consumer implementation for processing MinIO events and triggering PDF pipeline |
| RAGManager/app/workers/init.py | New workers package initialization file |
| RAGManager/app/core/rabbitmq.py | New RabbitMQ connection handler with connection management and message consumption |
| RAGManager/app/core/config.py | Added RabbitMQ configuration settings and connection URL property with credential encoding |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| logger.info("Starting PDF processor consumer") | ||
|
|
||
| try: | ||
| # Create RabbitMQ connection | ||
| rabbitmq = RabbitMQConnection() | ||
| rabbitmq.connect() | ||
|
|
||
| # Start consuming messages | ||
| queue_name = settings.rabbitmq_queue_name | ||
| logger.info(f"Consuming messages from queue: {queue_name}") | ||
|
|
||
| rabbitmq.consume_messages( | ||
| queue_name=queue_name, | ||
| callback=message_callback | ||
| ) | ||
|
|
||
| except KeyboardInterrupt: | ||
| logger.info("Consumer interrupted by user") | ||
| except Exception as e: | ||
| logger.error(f"Fatal error in consumer: {e}", exc_info=True) | ||
| raise |
There was a problem hiding this comment.
The start_consumer function does not handle connection failures gracefully after startup. If the RabbitMQ connection drops during operation, the consumer will crash and the exception will be raised in the daemon thread, but there's no mechanism to restart it. Consider implementing automatic reconnection logic with exponential backoff to make the consumer more resilient to network issues.
| url = settings.rabbitmq_url | ||
| logger.info(f"Connecting to RabbitMQ at {settings.rabbitmq_host}:{settings.rabbitmq_port}") | ||
| logger.debug( | ||
| f"RabbitMQ URL: amqp://{settings.rabbitmq_user}:***@{settings.rabbitmq_host}:{settings.rabbitmq_port}/" |
There was a problem hiding this comment.
The RabbitMQ connection credentials are logged in plain debug output which could expose sensitive information if debug logging is enabled in production. The password should be redacted in all log messages. Consider using '***' for all credential fields in the debug output, not just the password in line 26.
| f"RabbitMQ URL: amqp://{settings.rabbitmq_user}:***@{settings.rabbitmq_host}:{settings.rabbitmq_port}/" | |
| f"RabbitMQ URL: amqp://***:***@{settings.rabbitmq_host}:{settings.rabbitmq_port}/" |
| consumer_thread = threading.Thread(target=start_consumer, daemon=True) | ||
| consumer_thread.start() |
There was a problem hiding this comment.
The RabbitMQ consumer is started in a daemon thread without any error handling or graceful shutdown mechanism. If the consumer thread crashes or the connection fails after startup, the application will continue running but will silently stop processing messages. Consider storing the thread reference and implementing proper error monitoring or a shutdown event handler to detect and log consumer failures.
| consumer_thread = threading.Thread(target=start_consumer, daemon=True) | ||
| consumer_thread.start() | ||
| logging.info("RabbitMQ consumer started successfully") |
There was a problem hiding this comment.
The RabbitMQ connection is not being closed properly on application shutdown. The consumer thread is a daemon thread that will be forcefully terminated when the main process exits, potentially leaving the RabbitMQ connection open. Consider implementing a shutdown event handler that calls rabbitmq.close() to ensure graceful cleanup of resources.
| # Set QoS to process one message at a time | ||
| self.channel.basic_qos(prefetch_count=1) |
There was a problem hiding this comment.
The QoS setting prefetch_count=1 will process messages sequentially one at a time. While this ensures message ordering and prevents overwhelming the system, it may be inefficient for the PDF processing workload. Consider whether parallel processing of multiple PDFs would be beneficial, and if so, increase the prefetch_count or run multiple consumer instances.
| # Set QoS to process one message at a time | |
| self.channel.basic_qos(prefetch_count=1) | |
| # Set QoS prefetch count (default to processing one message at a time) | |
| prefetch_count = getattr(settings, "rabbitmq_prefetch_count", 1) | |
| logger.info(f"Setting RabbitMQ QoS prefetch_count={prefetch_count}") | |
| self.channel.basic_qos(prefetch_count=prefetch_count) |
| def message_callback(ch, method, properties, body): | ||
| """ | ||
| Callback function to process RabbitMQ messages. | ||
|
|
||
| Args: | ||
| ch: Channel | ||
| method: Method | ||
| properties: Properties | ||
| body: Message body (bytes) |
There was a problem hiding this comment.
The message callback function has generic parameter names (ch, method, properties, body) that don't provide clear type information. While these are conventional in pika, consider adding type hints to improve code clarity and enable better IDE support. For example: ch: pika.channel.Channel, method: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, body: bytes.
| @property | ||
| def rabbitmq_url(self) -> str: | ||
| """Returns the RabbitMQ connection URL with URL-encoded credentials.""" | ||
| from urllib.parse import quote_plus |
There was a problem hiding this comment.
The rabbitmq_url property imports urllib.parse.quote_plus inside the method. This import should be moved to the module level for better performance, as the import will be executed every time the property is accessed. Consider moving it to the top of the file with other imports.
| document_id = process_pdf_pipeline(pdf_path) | ||
| logger.info(f"PDF processed successfully: {pdf_path} -> Document ID: {document_id}") | ||
|
|
||
| # Acknowledge the message | ||
| ch.basic_ack(delivery_tag=method.delivery_tag) | ||
| logger.info(f"Message acknowledged for: {pdf_path}") |
There was a problem hiding this comment.
The callback function acknowledges messages even when the PDF processing pipeline only returns a mock document_id (when NotImplementedError is caught). This means messages will be permanently removed from the queue even though the processing is not fully complete. Consider implementing a dead-letter queue or requeuing strategy for partial failures until the pipeline is fully implemented.
| except Exception as e: | ||
| logger.error(f"Error processing message: {e}", exc_info=True) | ||
| # NACK without requeue to avoid infinite loops | ||
| # In production, consider implementing a dead-letter queue | ||
| ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) |
There was a problem hiding this comment.
The consumer lacks retry logic or circuit breaker patterns. If the process_pdf_pipeline encounters transient failures (e.g., temporary database unavailability), the message will be discarded (NACK without requeue). Consider implementing exponential backoff retries for transient errors before giving up, or using a dead-letter exchange for failed messages that can be retried later.
| @@ -0,0 +1,99 @@ | |||
| import json | |||
There was a problem hiding this comment.
Import of 'json' is not used.
| import json |
…' and enhance endpoint validation in MinIO client setup.
No description provided.