Skip to content

Data Retrieval ‐ Request and their Help Categories

Pallavi Chitrada edited this page Mar 16, 2026 · 1 revision

Option 1: JOIN Query

Permissions

  • DB credentials (host, port, dbname, user, password) — find them in Parameter Store
  • SELECT privilege — all users have permission by default

Install Dependencies

pip install psycopg2-binary python-dotenv

Store credentials in .env

Python Script

Fetches all fields for a given user_id and req_id in one query. Pulls from: request + req_add_info + metadata tables.

import psycopg2
import os
from dotenv import load_dotenv
 
load_dotenv()
 
 
def get_connection():
    return psycopg2.connect(
        host     = os.getenv("DB_HOST"),
        port     = os.getenv("DB_PORT"),
        dbname   = os.getenv("DB_NAME"),
        user     = os.getenv("DB_USER"),
        password = os.getenv("DB_PASSWORD")
    )
 
 
def get_request_full_details(user_id: str, req_id: str):
    query = """
        SELECT
            -- From request table
            r.req_id,
            r.req_user_id,
            r.req_cat_id,
            r.req_subj,
            r.req_desc,
            r.req_loc,
            r.req_type_id,
            r.req_priority_id,
            r.req_status_id,
            r.iscalamity,
            r.submission_date,
 
            -- From req_add_info + metadata tables
            m.field_name_key   AS question,
            m.field_type,
            l.item_value       AS list_answer,
            rai.field_value
 
        FROM virginia_dev_saayam_rdbms.request r
 
        LEFT JOIN virginia_dev_saayam_rdbms.req_add_info rai
            ON r.req_id = rai.req_id
 
        LEFT JOIN virginia_dev_saayam_rdbms.req_add_info_metadata m
            ON rai.field_id = m.field_id
 
        LEFT JOIN virginia_dev_saayam_rdbms.list_item_metadata l
            ON rai.item_id = l.item_id
 
        WHERE r.req_user_id = %s
          AND r.req_id      = %s
 
        ORDER BY rai.field_id;
    """
 
    conn = None
    try:
        conn = get_connection()
        cur  = conn.cursor()
        cur.execute(query, (user_id, req_id))
        rows = cur.fetchall()
        cols = [desc[0] for desc in cur.description]
 
        if not rows:
            return {"error": "No data found for given user_id and req_id"}
 
        # Base request fields — same across all rows, so read from first row
        result = {
            "req_id"          : rows[0][cols.index("req_id")],
            "req_user_id"     : rows[0][cols.index("req_user_id")],
            "req_cat_id"      : rows[0][cols.index("req_cat_id")],
            "req_subj"        : rows[0][cols.index("req_subj")],
            "req_desc"        : rows[0][cols.index("req_desc")],
            "req_loc"         : rows[0][cols.index("req_loc")],
            "req_type_id"     : rows[0][cols.index("req_type_id")],
            "req_priority_id" : rows[0][cols.index("req_priority_id")],
            "req_status_id"   : rows[0][cols.index("req_status_id")],
            "iscalamity"      : rows[0][cols.index("iscalamity")],
            "submission_date" : str(rows[0][cols.index("submission_date")]),
            "additional_info" : []
        }
 
        # Group additional info answers by question
        questions = {}
        for row in rows:
            field  = row[cols.index("question")]
            f_type = row[cols.index("field_type")]
            l_ans  = row[cols.index("list_answer")]
            ft_ans = row[cols.index("field_value")]
 
            if field is None:
                continue
 
            if field not in questions:
                questions[field] = {
                    "question"   : field,
                    "field_type" : f_type,
                    "answers"    : []
                }
 
            # Append list answer or free-text answer
            if l_ans:
                questions[field]["answers"].append(l_ans)
            elif ft_ans:
                questions[field]["answers"].append(ft_ans)
 
        result["additional_info"] = list(questions.values())
        return result
 
    except Exception as e:
        return {"error": str(e)}
 
    finally:
        if conn:
            conn.close()
 
 
# Example usage
if __name__ == "__main__":
    import json
    data = get_request_full_details("USER-1", "REQ-1")
    print(json.dumps(data, indent=2))

Sample Output

{
  "req_id"          : "REQ-1",
  "req_user_id"     : "USER-1",
  "req_cat_id"      : "1.1",
  "req_subj"        : "Need food assistance",
  "req_desc"        : "Family of 4 needs weekly food support",
  "req_loc"         : "Virginia",
  "req_type_id"     : 1,
  "req_priority_id" : 2,
  "req_status_id"   : 1,
  "iscalamity"      : false,
  "submission_date" : "2024-01-15 10:30:00",
  "additional_info" : [
    {
      "question"   : "PREFERRED_MEAL_TYPE",
      "field_type" : "list",
      "answers"    : ["VEG", "NON-VEG"]
    },
    {
      "question"   : "HOUSEHOLD_SIZE",
      "field_type" : "int",
      "answers"    : ["4"]
    }
  ]
}

Option 2: Separate Queries + Python Merging

Instead of a single JOIN, fire two separate queries and merge in Python. Use this when tables are large and you want to avoid a heavy JOIN on the DB server.

def get_request_full_details(user_id: str, req_id: str):
    conn = None
    try:
        conn = get_connection()
        cur  = conn.cursor()
 
        # Query 1: Get base request details
        cur.execute("""
            SELECT req_id, req_user_id, req_cat_id, req_subj,
                   req_desc, req_loc, req_status_id, submission_date
            FROM virginia_dev_saayam_rdbms.request
            WHERE req_user_id = %s AND req_id = %s
        """, (user_id, req_id))
 
        request_row  = cur.fetchone()
        request_cols = [desc[0] for desc in cur.description]
        request_data = dict(zip(request_cols, request_row))
 
        # Query 2: Get additional info for the same req_id
        cur.execute("""
            SELECT rai.field_id, m.field_name_key, m.field_type,
                   l.item_value, rai.field_value
            FROM virginia_dev_saayam_rdbms.req_add_info rai
            JOIN virginia_dev_saayam_rdbms.req_add_info_metadata m
                ON rai.field_id = m.field_id
            LEFT JOIN virginia_dev_saayam_rdbms.list_item_metadata l
                ON rai.item_id = l.item_id
            WHERE rai.req_id = %s
        """, (req_id,))
 
        add_info_rows = cur.fetchall()
        add_info_cols = [desc[0] for desc in cur.description]
 
        # Merge additional info into request data in Python
        request_data["additional_info"] = [
            dict(zip(add_info_cols, row)) for row in add_info_rows
        ]
        return request_data
 
    finally:
        if conn:
            conn.close()
 
 
# Example usage
if __name__ == "__main__":
    import json
    result = get_request_full_details("USER-1", "REQ-1")
    print(json.dumps(result, indent=2, default=str))

Sample Output

{
  "req_id"          : "REQ-1",
  "req_user_id"     : "USER-1",
  "req_cat_id"      : "1.1",
  "req_subj"        : "Need food assistance",
  "req_desc"        : "Family of 4 needs support",
  "req_loc"         : "Virginia",
  "req_status_id"   : 1,
  "req_priority_id" : 2,
  "req_type_id"     : 1,
  "iscalamity"      : false,
  "submission_date" : "2024-01-15 10:30:00",
  "additional_info" : [
    {
      "field_id"       : "1.1.A",
      "field_name_key" : "PREFERRED_MEAL_TYPE",
      "field_type"     : "list",
      "item_id"        : "1.1.A.1",
      "item_value"     : "VEG",
      "field_value"    : null
    },
    {
      "field_id"       : "1.1.A",
      "field_name_key" : "PREFERRED_MEAL_TYPE",
      "field_type"     : "list",
      "item_id"        : "1.1.A.2",
      "item_value"     : "NON-VEG",
      "field_value"    : null
    },
    {
      "field_id"       : "1.1.C",
      "field_name_key" : "HOUSEHOLD_SIZE",
      "field_type"     : "int",
      "item_id"        : null,
      "item_value"     : null,
      "field_value"    : "4"
    }
  ]
}

Option 3: ORM (SQLAlchemy)

No raw SQL — relationships are defined once in models and reused everywhere.

Install Dependencies

pip install sqlalchemy psycopg2-binary

Models

# app/db/models.py
 
from sqlalchemy import Column, String, Integer, Boolean, ForeignKey, TIMESTAMP
from sqlalchemy.orm import relationship, declarative_base
 
Base   = declarative_base()
SCHEMA = "virginia_dev_saayam_rdbms"
 
 
# Static table — category hierarchy
# e.g. "1.1" → "FOOD_ASSISTANCE"
class HelpCategory(Base):
    __tablename__  = "help_categories"
    __table_args__ = {"schema": SCHEMA}
 
    cat_id   = Column(String(50),  primary_key=True)
    cat_name = Column(String(100), nullable=False)
    cat_desc = Column(String(150), nullable=False)
 
    requests = relationship("Request",            back_populates="category")
    fields   = relationship("ReqAddInfoMetadata", back_populates="category")
 
 
# Static table — questions per category
# e.g. "1.1.A" → "PREFERRED_MEAL_TYPE" → "list"
class ReqAddInfoMetadata(Base):
    __tablename__  = "req_add_info_metadata"
    __table_args__ = {"schema": SCHEMA}
 
    field_id       = Column(String(70),  primary_key=True)
    field_name_key = Column(String(100))
    field_type     = Column(String(20))
    status         = Column(String(10),  default="active")
    cat_id         = Column(String(50),  ForeignKey(f"{SCHEMA}.help_categories.cat_id"))
 
    category = relationship("HelpCategory",     back_populates="fields")
    items    = relationship("ListItemMetadata", back_populates="field")
    answers  = relationship("ReqAddInfo",       back_populates="metadata")
 
 
# Static table — answer options for list-type fields
# e.g. "1.1.A.1" → "VEG"
class ListItemMetadata(Base):
    __tablename__  = "list_item_metadata"
    __table_args__ = {"schema": SCHEMA}
 
    item_id    = Column(String(100), primary_key=True)
    field_id   = Column(String(70),  ForeignKey(f"{SCHEMA}.req_add_info_metadata.field_id"))
    item_value = Column(String(100))
    item_type  = Column(String(20))
 
    field   = relationship("ReqAddInfoMetadata", back_populates="items")
    answers = relationship("ReqAddInfo",         back_populates="list_item")
 
 
# Lookup table — user details
class User(Base):
    __tablename__  = "users"
    __table_args__ = {"schema": SCHEMA}
 
    user_id    = Column(String(255), primary_key=True)
    user_name  = Column(String(100))
    user_email = Column(String(100))
 
    requests = relationship("Request", back_populates="user")
 
 
# Lookup table — e.g. OPEN, CLOSED, IN_PROGRESS
class RequestStatus(Base):
    __tablename__  = "request_status"
    __table_args__ = {"schema": SCHEMA}
 
    req_status_id = Column(Integer, primary_key=True)
    status_name   = Column(String(50))
 
    requests = relationship("Request", back_populates="status")
 
 
# Lookup table — e.g. LOW, MEDIUM, HIGH
class RequestPriority(Base):
    __tablename__  = "request_priority"
    __table_args__ = {"schema": SCHEMA}
 
    req_priority_id = Column(Integer, primary_key=True)
    priority_name   = Column(String(50))
 
    requests = relationship("Request", back_populates="priority")
 
 
# Lookup table — e.g. ASSISTANCE, DONATION
class RequestType(Base):
    __tablename__  = "request_type"
    __table_args__ = {"schema": SCHEMA}
 
    req_type_id = Column(Integer, primary_key=True)
    type_name   = Column(String(50))
 
    requests = relationship("Request", back_populates="type")
 
 
# Transactional table — submitted requests
# PARENT of ReqAddInfo — req_id is generated here first
class Request(Base):
    __tablename__  = "request"
    __table_args__ = {"schema": SCHEMA}
 
    req_id           = Column(String(255), primary_key=True)
    req_user_id      = Column(String(255), ForeignKey(f"{SCHEMA}.users.user_id"),                    nullable=False)
    req_for_id       = Column(Integer,     ForeignKey(f"{SCHEMA}.request_for.req_for_id"),           nullable=False)
    req_islead_id    = Column(Integer,     ForeignKey(f"{SCHEMA}.request_isleadvol.req_islead_id"),  nullable=False)
    req_cat_id       = Column(String(50),  ForeignKey(f"{SCHEMA}.help_categories.cat_id"),           nullable=False)
    req_type_id      = Column(Integer,     ForeignKey(f"{SCHEMA}.request_type.req_type_id"),         nullable=False)
    req_priority_id  = Column(Integer,     ForeignKey(f"{SCHEMA}.request_priority.req_priority_id"), nullable=False)
    req_status_id    = Column(Integer,     ForeignKey(f"{SCHEMA}.request_status.req_status_id"),     nullable=False)
    req_loc          = Column(String(125))
    iscalamity       = Column(Boolean)
    req_subj         = Column(String(125), nullable=False)
    req_desc         = Column(String(255), nullable=False)
    req_doc_link     = Column(String)
    audio_req_desc   = Column(String(255))
    submission_date  = Column(TIMESTAMP)
    serviced_date    = Column(TIMESTAMP)
    last_update_date = Column(TIMESTAMP)
 
    additional_info = relationship("ReqAddInfo",      back_populates="request")   # child table
    category        = relationship("HelpCategory",    back_populates="requests")
    user            = relationship("User",            back_populates="requests")
    status          = relationship("RequestStatus",   back_populates="requests")
    priority        = relationship("RequestPriority", back_populates="requests")
    type            = relationship("RequestType",     back_populates="requests")
 
 
# Transactional table — submitted answers per request
# CHILD of Request — req_id must exist in request first
# One row per answer (list-type or free-text)
class ReqAddInfo(Base):
    __tablename__  = "req_add_info"
    __table_args__ = {"schema": SCHEMA}
 
    id          = Column(Integer,     primary_key=True, autoincrement=True)
    req_id      = Column(String(255), ForeignKey(f"{SCHEMA}.request.req_id"),                  nullable=False)
    field_id    = Column(String(70),  ForeignKey(f"{SCHEMA}.req_add_info_metadata.field_id"),  nullable=False)
    item_id     = Column(String(100), ForeignKey(f"{SCHEMA}.list_item_metadata.item_id"),      nullable=True)
    field_value = Column(String(255), nullable=True)
 
    request   = relationship("Request",            back_populates="additional_info")  # back to parent
    metadata  = relationship("ReqAddInfoMetadata", back_populates="answers")          # resolves field name
    list_item = relationship("ListItemMetadata",   back_populates="answers")          # resolves item value

Retrieval Script

# app/services/request_service.py
 
from sqlalchemy.orm import Session, joinedload
from app.db.models import Request, ReqAddInfo
 
 
# Fetches full details of a specific request.
# Uses joinedload to fetch everything in ONE query.
# Parameters:
#   db      : SQLAlchemy Session (injected via FastAPI)
#   user_id : the user who submitted the request
#   req_id  : the specific request to fetch
def get_request_by_user_and_id(db: Session, user_id: str, req_id: str) -> dict:
    req = (
        db.query(Request)
        .options(
            joinedload(Request.user),
            joinedload(Request.category),
            joinedload(Request.status),
            joinedload(Request.priority),
            joinedload(Request.type),
            joinedload(Request.additional_info)
                .joinedload(ReqAddInfo.metadata),
            joinedload(Request.additional_info)
                .joinedload(ReqAddInfo.list_item),
        )
        .filter(
            Request.req_id      == req_id,
            Request.req_user_id == user_id
        )
        .first()
    )
 
    if not req:
        return {"error": f"No request found for req_id={req_id} and user_id={user_id}"}
 
    return _serialize_request(req)
 
 
# Fetches ALL requests submitted by a user.
# Parameters:
#   db      : SQLAlchemy Session
#   user_id : the user whose requests to fetch
def get_all_requests_by_user(db: Session, user_id: str) -> list:
    requests = (
        db.query(Request)
        .options(
            joinedload(Request.user),
            joinedload(Request.category),
            joinedload(Request.status),
            joinedload(Request.priority),
            joinedload(Request.type),
            joinedload(Request.additional_info)
                .joinedload(ReqAddInfo.metadata),
            joinedload(Request.additional_info)
                .joinedload(ReqAddInfo.list_item),
        )
        .filter(Request.req_user_id == user_id)
        .all()
    )
 
    if not requests:
        return []
 
    return [_serialize_request(req) for req in requests]
 
 
# Fetches ONLY the additional info answers for a request.
# Useful when base request fields are already loaded.
# Parameters:
#   db     : SQLAlchemy Session
#   req_id : the request whose additional info to fetch
def get_additional_info_by_request(db: Session, req_id: str) -> list:
    answers = (
        db.query(ReqAddInfo)
        .options(
            joinedload(ReqAddInfo.metadata),
            joinedload(ReqAddInfo.list_item),
        )
        .filter(ReqAddInfo.req_id == req_id)
        .all()
    )
 
    if not answers:
        return []
 
    return [_serialize_answer(a) for a in answers]
 
 
# Private helper — converts a Request ORM object into a plain dict.
# Used by get_request_by_user_and_id and get_all_requests_by_user.
def _serialize_request(req: Request) -> dict:
    return {
        "req_id"           : req.req_id,
        "req_subj"         : req.req_subj,
        "req_desc"         : req.req_desc,
        "req_loc"          : req.req_loc,
        "iscalamity"       : req.iscalamity,
        "req_doc_link"     : req.req_doc_link,
        "audio_req_desc"   : req.audio_req_desc,
        "submission_date"  : str(req.submission_date)  if req.submission_date  else None,
        "serviced_date"    : str(req.serviced_date)    if req.serviced_date    else None,
        "last_update_date" : str(req.last_update_date) if req.last_update_date else None,
 
        "user" : {
            "user_id"    : req.user.user_id    if req.user else None,
            "user_name"  : req.user.user_name  if req.user else None,
            "user_email" : req.user.user_email if req.user else None,
        },
 
        "category" : {
            "cat_id"   : req.category.cat_id   if req.category else None,
            "cat_name" : req.category.cat_name if req.category else None,
            "cat_desc" : req.category.cat_desc if req.category else None,
        },
 
        "status"   : {"status_name"   : req.status.status_name     if req.status   else None},
        "priority" : {"priority_name" : req.priority.priority_name if req.priority else None},
        "type"     : {"type_name"     : req.type.type_name         if req.type     else None},
 
        "additional_info" : [
            _serialize_answer(a) for a in req.additional_info
        ]
    }
 
 
# Private helper — converts a ReqAddInfo ORM object into a plain dict.
# Resolves field_id → field_name and item_id → item_value.
# Used by _serialize_request and get_additional_info_by_request.
def _serialize_answer(a: ReqAddInfo) -> dict:
    return {
        "field_id"    : a.field_id,
        "field_name"  : a.metadata.field_name_key if a.metadata  else None,
        "field_type"  : a.metadata.field_type     if a.metadata  else None,
        "item_id"     : a.item_id,
        "item_value"  : a.list_item.item_value    if a.list_item else None,
        "item_type"   : a.list_item.item_type     if a.list_item else None,
        "field_value" : a.field_value
    }

ORM Concerns

Concern Detail
N+1 without joinedload Without .options(joinedload(...)), every relationship access fires a separate DB query — fixed above by always using joinedload
joinedload on lists Using joinedload on a one-to-many (like additional_info) can duplicate parent rows in SQL — SQLAlchemy handles deduplication but it adds memory overhead
Learning curve Team must understand lazy vs eager loading, or they'll accidentally trigger N+1 without realising
Generated SQL not obvious Hard to know exactly what SQL is being fired — use echo=True in create_engine to debug
Overkill for simple fetch For just two IDs, raw SQL Option 1 is simpler and more predictable
Serialization is manual ORM objects can't be returned directly as JSON — _serialize_request() must be maintained separately as the schema evolves

Recommendation

For this use case, Option 1 or Option 2 is the better fit. Both are simple, transparent, and easy to debug — perfectly sufficient since you're fetching a single request by two known IDs.

Clone this wiki locally