Skip to content
This repository was archived by the owner on Jan 2, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements-rabbit.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ djangorestframework-simplejwt
djangorestframework-api-key==2.*
psycopg2
django-silk
asgiref
79 changes: 40 additions & 39 deletions server/mq/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,64 @@
import logging

import mq
from asgiref.sync import sync_to_async
from pika import BasicProperties
from resume_review.models import Resume

logger = logging.getLogger(__name__)


@mq.consumer(
queue="reviewed-feedback",
routing_key="reviewed-feedback",
queue="server.verified-email",
routing_key="server.verified-email",
exchange="swecc-server-exchange",
)
async def reviewed_feedback_callback(
async def verified_email_callback(
body,
properties: BasicProperties,
):
expected_fields = ["user_id", "resume_id", "feedback", "error"]
logger.info(f"Received verified email message: {body}")

# Validate body
try:
data = json.loads(body)
for field in expected_fields:
if field not in data:
raise ValueError(f"Field {field} not found in message")
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON: {e}")
return
except ValueError as e:
logger.error(f"Error validating message: {e}")
return

if data["error"]:
logger.info(f"Received error message: {data['error']}")
@mq.consumer(queue="server.reviewed-resume", exchange="ai", routing_key="reviewed")
async def reviewed_feedback(
body: bytes,
properties: BasicProperties,
):
body = body.decode("utf-8")
logger.info(f"Received reviewed resume message: {body}")
body = json.loads(body)
Comment thread
Advayp marked this conversation as resolved.
Outdated
feedback = body.get("feedback", None)
key = body.get("key", None)

if feedback is None or key is None:
logger.error("Feedback or key not found in message")
return

resume_object = Resume.objects.filter(id=data["resume_id"]).first()
user_id, resume_id, file_name = key.split("-")

if not resume_object:
logger.error(
f"Resume with ID {data['resume_id']} not found"
) # Technically unreachable, though can't hurt to check for
return
@sync_to_async
def perform_database_operations():
resume_object = Resume.objects.filter(id=int(resume_id)).first()
if not resume_object:
logger.error(f"Resume with ID {resume_id} not found")
return False

resume_object.feedback = data["feedback"]
resume_object.save()
if resume_object.member.id != int(user_id):
logger.error(
f"Resume with ID {resume_id} does not belong to user {user_id}"
)
return False

logger.info(
f"Feedback for resume {data['resume_id']} updated with value {data['feedback']}"
)
if resume_object.file_name != file_name:
logger.error(
f"Resume with ID {resume_id} does not have file name {file_name}"
)
return False

resume_object.feedback = feedback
resume_object.save()

@mq.consumer(
queue="server.verified-email",
routing_key="server.verified-email",
exchange="swecc-server-exchange",
)
async def verified_email_callback(
body,
properties: BasicProperties,
):
logger.info(f"Received verified email message: {body}")
successful = await perform_database_operations()
if successful:
logger.info(f"Feedback for resume {resume_id} updated with value {feedback}")