diff --git a/requirements-rabbit.txt b/requirements-rabbit.txt index 6e6bcca..f3b6c4e 100644 --- a/requirements-rabbit.txt +++ b/requirements-rabbit.txt @@ -6,3 +6,4 @@ djangorestframework-simplejwt djangorestframework-api-key==2.* psycopg2 django-silk +asgiref diff --git a/server/mq/consumers.py b/server/mq/consumers.py index af5b985..08b0ae4 100644 --- a/server/mq/consumers.py +++ b/server/mq/consumers.py @@ -2,6 +2,7 @@ import logging import mq +from asgiref.sync import sync_to_async from pika import BasicProperties from resume_review.models import Resume @@ -9,56 +10,60 @@ @mq.consumer( - queue="reviewed-feedback", - routing_key="reviewed-feedback", + queue="server.verified-email", + routing_key="server.verified-email", exchange="swecc-server-exchange", ) -async def reviewed_feedback_callback( +async def verified_email_callback( body, properties: BasicProperties, ): - expected_fields = ["user_id", "resume_id", "feedback", "error"] + logger.info(f"Received verified email message: {body}") + - # Validate body - try: - data = json.loads(body) - for field in expected_fields: - if field not in data: - raise ValueError(f"Field {field} not found in message") +@mq.consumer(queue="server.reviewed-resume", exchange="ai", routing_key="reviewed") +async def reviewed_feedback( + body: bytes, + properties: BasicProperties, +): + body = body.decode("utf-8") + logger.info(f"Received reviewed resume message: {body}") + try: + body = json.loads(body) except json.JSONDecodeError as e: - logger.error(f"Error decoding JSON: {e}") - return - except ValueError as e: - logger.error(f"Error validating message: {e}") + logger.error(f"Failed to decode JSON: {e}") return + feedback = body.get("feedback", None) + key = body.get("key", None) - if data["error"]: - logger.info(f"Received error message: {data['error']}") + if feedback is None or key is None: + logger.error("Feedback or key not found in message") return - resume_object = Resume.objects.filter(id=data["resume_id"]).first() + user_id, resume_id, file_name = key.split("-") - if not resume_object: - logger.error( - f"Resume with ID {data['resume_id']} not found" - ) # Technically unreachable, though can't hurt to check for - return + @sync_to_async + def perform_database_operations(): + resume_object = Resume.objects.filter(id=int(resume_id)).first() + if not resume_object: + logger.error(f"Resume with ID {resume_id} not found") + return False - resume_object.feedback = data["feedback"] - resume_object.save() + if resume_object.member.id != int(user_id): + logger.error( + f"Resume with ID {resume_id} does not belong to user {user_id}" + ) + return False - logger.info( - f"Feedback for resume {data['resume_id']} updated with value {data['feedback']}" - ) + if resume_object.file_name != file_name: + logger.error( + f"Resume with ID {resume_id} does not have file name {file_name}" + ) + return False + resume_object.feedback = feedback + resume_object.save() -@mq.consumer( - queue="server.verified-email", - routing_key="server.verified-email", - exchange="swecc-server-exchange", -) -async def verified_email_callback( - body, - properties: BasicProperties, -): - logger.info(f"Received verified email message: {body}") + successful = await perform_database_operations() + if successful: + logger.info(f"Feedback for resume {resume_id} updated with value {feedback}")