Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions data-analytics/lambda_functions/beneficiary_trend_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
import json
import pandas as pd
from datetime import datetime
import io
import os
import sys

current_dir = os.path.dirname(os.path.abspath(__file__))
data_engineering_path = os.path.join(current_dir, '..', '..', 'data-engineering')
sys.path.append(data_engineering_path)

from src.utils.db_client import DatabaseClient

def lambda_handler(event, context):
conn = None
cursor = None
try:
db = DatabaseClient()
conn = db.conn
conn.autocommit = False
cursor = conn.cursor()

REAL_TABLE_REQUEST = "ireland_dev_saayam_rdbms.request"
REAL_TABLE_USERS = "ireland_dev_saayam_rdbms.users"
REAL_TABLE_COUNTRY = "ireland_dev_saayam_rdbms.country"

###############################################################################################
###############################################################################################
###############################################################################################

def aggregate_beneficiaries(interval):
try:
# Queries the request table for dates within an interval length of time before the present.
query = f"""
SELECT DISTINCT req_user_id, last_update_date
FROM {REAL_TABLE_REQUEST}
WHERE last_update_date > CURRENT_TIMESTAMP - INTERVAL '{interval}'
"""
cursor.execute(query)

except Exception as e:
# Returns an empty list if query fails.
return []
dates = cursor.fetchall()
beneficiary_date = [t[1] for t in dates if len(t) > 1]

return beneficiary_date

def get_beneficiaries_dic(interval, group_by="day"):
# Fetch data
beneficiaries_dates = aggregate_beneficiaries(interval)

if not beneficiaries_dates:
return []

# Convert to DataFrame
df = pd.DataFrame(beneficiaries_dates, columns=["last_update_date"])
df["last_update_date"] = pd.to_datetime(df["last_update_date"])

# Group by day or month
if group_by == "day":
df_grouped = (
df.groupby(df["last_update_date"].dt.date) # group by date
.size()
.reset_index(name="Count")
)
df_grouped["Date"] = df_grouped["last_update_date"].apply(
lambda x: pd.Timestamp(x).isoformat()
)
elif group_by == "month":
df_grouped = (
df.groupby(df["last_update_date"].dt.to_period("M"))
.size()
.reset_index(name="Count")
)
df_grouped["Date"] = df_grouped["last_update_date"].apply(lambda x: x.to_timestamp().isoformat()
)
else:
raise ValueError("group_by must be either 'day' or 'month'")

# Build list of dicts
dic = df_grouped[["Date", "Count"]].to_dict("records")

return dic

def aggregate_beneficiaries_country():
try:
# Queries the database for beneficaries and their country.
query = f"""
SELECT DISTINCT req_user_id, c.country_name
FROM {REAL_TABLE_REQUEST}
INNER JOIN {REAL_TABLE_USERS} as u ON {REAL_TABLE_REQUEST}.req_user_id = u.user_id
INNER JOIN {REAL_TABLE_COUNTRY} as c ON u.country_id = c.country_id
"""
cursor.execute(query)

except Exception as e:
# Returns a dictionary with a status code of 500 if query fails.
return {
'status_code': 500,
'error': 'Could not query the database.',
'beneficiaries by country': []
}
rows = cursor.fetchall()

# Count how many beneficiaries per country
if not rows:
return []

df = pd.DataFrame(rows, columns=["user_id", "country"])
df_grouped = df.groupby("country").size().reset_index(name="Count")

# Return as list of dicts
dic = df_grouped.to_dict("records")

return dic

###############################################################################################
###############################################################################################
###############################################################################################

def aggregate_help_requests(interval):
try:
# Queries the database for help requests within an interval length of time before the present.
query = f"""
SELECT submission_date
FROM {REAL_TABLE_REQUEST}
WHERE submission_date > CURRENT_TIMESTAMP - INTERVAL '{interval}'
"""
cursor.execute(query)

except Exception as e:
# Returns an empty list if query fails.
return []
dates = cursor.fetchall()
request_date = [t[0] for t in dates if len(t) > 0]

return request_date

def get_help_requests_dic(interval, group_by="day"):
# Fetch data
request_dates = aggregate_help_requests(interval)

if not request_dates:
return []

# Convert to DataFrame
df = pd.DataFrame(request_dates, columns=["submission_date"])
df["submission_date"] = pd.to_datetime(df["submission_date"])

# Group by day or month
if group_by == "day":
df_grouped = (
df.groupby(df["submission_date"].dt.date) # group by date
.size()
.reset_index(name="Count")
)
df_grouped["Date"] = df_grouped["submission_date"].apply(
lambda x: pd.Timestamp(x).isoformat()
)
elif group_by == "month":
df_grouped = (
df.groupby(df["submission_date"].dt.to_period("M"))
.size()
.reset_index(name="Count")
)
df_grouped["Date"] = df_grouped["submission_date"].apply(lambda x: x.to_timestamp().isoformat()
)
else:
raise ValueError("group_by must be either 'day' or 'month'")

# Build list of dicts
dic = df_grouped[["Date", "Count"]].to_dict("records")

return dic

def aggregate_help_requests_country():
# Queries for help requests and their country.
try:
query = f"""
SELECT c.country_name
FROM {REAL_TABLE_REQUEST}
INNER JOIN {REAL_TABLE_USERS} as u on {REAL_TABLE_REQUEST}.req_user_id = u.user_id
INNER JOIN {REAL_TABLE_COUNTRY} as c on u.country_id = c.country_id
"""
cursor.execute(query)

except Exception as e:
# Returns a dictionary with status code 500 if query fails.
return {
'status_code': 500,
'error': 'Could not query the database.',
'requests by country': []
}
rows = cursor.fetchall()

# Count how many beneficiaries per country
if not rows:
return []

df = pd.DataFrame(rows, columns=["country"])
df_grouped = df.groupby("country").size().reset_index(name="Count")

# Return as list of dicts
dic = df_grouped.to_dict("records")

return dic


# Obtains the dictionaries for beneficiaries categorized by time and country.
beneficiaries_days = get_beneficiaries_dic("7 days", "day")
beneficiaries_month = get_beneficiaries_dic("1 month", "day")
beneficiaries_year = get_beneficiaries_dic("1 year", "month")
beneficiaries_country = aggregate_beneficiaries_country()

# Obtains the dictionaries for help requests categorized by time and country.
help_requests_days = get_help_requests_dic("7 days", "day")
help_requests_month = get_help_requests_dic("1 month", "day")
help_requests_year = get_help_requests_dic("1 year", "month")
help_requests_country = aggregate_help_requests_country()

response_body = {"7 days beneficiaries": beneficiaries_days,
"1 month beneficiaries": beneficiaries_month,
"1 year beneficiaries": beneficiaries_year,
"Country beneficiaries": beneficiaries_country,
"7 days help requests": help_requests_days,
"1 month help requests": help_requests_month,
"1 year help requests": help_requests_year,
"Country help requests": help_requests_country}

# Returns a status code of 200 and a JSON body consisting of beneficaries and requests by time and country.
http_res = {
'statusCode': 200,
'body': json.dumps(response_body)
}
return http_res

except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
finally:
if cursor:
cursor.close()
if conn:
conn.close()
print("Database connection successfully closed")

if __name__ == "__main__":
print(lambda_handler({}, None))
2 changes: 2 additions & 0 deletions data-analytics/lambda_functions/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
psycopg2-binary
pandas
23 changes: 23 additions & 0 deletions data-engineering/src/utils/db_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os
import psycopg2

class DatabaseClient:
"""Abstraction layer for database operations.

When we migrate to Redis/vector store, we only need to
change this file, not every file that uses the DB.
"""
def __init__(self):
# Currently PostgreSQL
self.conn = self._get_postgres_connection()

def _get_postgres_connection(self):
"""Initializes and returns the psycopg2 database connection."""
db_config = {
"host": os.environ.get('host'),
"port": os.environ.get('port'),
"dbname": os.environ.get('dbname'),
"user": os.environ.get('user'),
"password": os.environ.get('password')
}
return psycopg2.connect(**db_config)