Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions data-engineering/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
## Local Setup

```bash
git clone -b dev https://github.com/saayam-for-all/data.git
git clone -b main https://github.com/saayam-for-all/data.git
cd data
python -m venv venv
source venv/bin/activate # macOS/Linux — or venv\Scripts\activate on Windows
cd data-engineering
pip install -r requirements.txt
cp .env.example .env # Fill in your environment variables
```
Expand Down Expand Up @@ -174,19 +175,19 @@ All Lambda functions live under `src/`. Create a new folder for your Lambda:
src/
└── your_lambda_name/
├── __init__.py # Required - makes it a Python package
├── handler.py # Entry point for the Lambda
├── requirements.txt # Lambda-specific dependencies
└── other_modules.py # Supporting code (optional)
├── lambda_function.py # Entry point (must have lambda_handler function)
├── helpers.py # Supporting code (optional)
└── requirements.txt # Lambda-specific dependencies (lightweight only)
```

**Steps:**
1. Create folder: `mkdir src/your_lambda_name`
2. Add `__init__.py`: `touch src/your_lambda_name/__init__.py`
3. Create `handler.py` with your Lambda entry point
4. Add a `requirements.txt` with dependencies specific to this Lambda
5. Add a deploy script: `scripts/deploy/deploy_your_lambda.sh`
3. Create `lambda_function.py` with a `lambda_handler(event, context)` function
4. Add a `requirements.txt` with lightweight dependencies (heavy packages like pandas should be Lambda Layers)
5. Push via PR → auto-deploys on merge to main

**Example:** See `src/aggregator/` or `src/categorizer/` for reference.
**Reference:** See [`src/saayam-org-aggregator/`](src/saayam-org-aggregator/) for a complete working example.

### Adding a New Scraper

Expand Down
101 changes: 101 additions & 0 deletions data-engineering/src/aggregate-daily-metrics/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import boto3
import json
import pg8000
from aws_lambda_powertools.utilities import parameters
import pandas as pd

s3_client = boto3.client('lambda')

_creds = json.loads(parameters.get_parameter(
'/dev/saayam/db/Virginia/Analytics/user',
decrypt=True,
max_age=3600
))
_db_name = _creds['DATABASE NAME']
_db_conn = pg8000.connect(
host=_creds['HOST'],
user=_creds['USERNAME'],
password=_creds['PASSWORD'],
database=_db_name,
port=_creds['PORT'],
ssl_context=True
)

def get_metric_numbers(data):
'''
totalRequests: 0,
requestsResolved: 0,
totalVolunteers: 0,
totalBeneficiaries: 0
'''

def get_requests_metrics():

sql_query = '''
select
r.*,
rs.req_status
from {_db_name}.request as r
left join {_db_name}.request_status as rs on rs.req_status_id = r.req_status_id

'''
df = pd.read_sql(sql_query)

totalRequests = len(df)
requestsResolved = len(df[df["req_status_id"] == 3])

return {
"totalRequests": totalRequests,
"requestsResolved": requestsResolved
}

def get_volunteers_metrics():

sql_query = '''
select
*,
from {_db_name}..volunteer_details
'''
df = pd.read_sql(sql_query)

return {
"totalVolunteers": len(df)
}

def get_beneficiary_metrics():
# sql_query = '''
# select
# *,
# from {_db_name}..volunteer_details
# '''
# df = pd.read_sql(sql_query)

return {
"totalBeneficiaries": 99
}

def get_metrics():
try:

requests_metrics = get_requests_metrics()
volunteers_metrics = get_volunteers_metrics()
beneficiaries_metrics = get_beneficiary_metrics()

metrics = {**requests_metrics, **volunteers_metrics, **beneficiaries_metrics}

return metrics

except pg8000.DatabaseError as e:
raise Exception(f'Database error: {str(e)}')
except Exception as e:
raise Exception(f'Error fetching from DB: {str(e)}')

def write_metrics_to_s3(metrics, bucket, file_path):
s3_client.put_object(
Bucket=bucket,
Key=file_path,
Body=json.dumps(metrics, indent=2),
ContentType="application/json"
)


33 changes: 33 additions & 0 deletions data-engineering/src/aggregate-daily-metrics/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import json
from helpers import get_metrics, write_metrics_to_s3

def lambda_handler(event, context):
try:

bucket = "saayam-virginia-public/",
file_path ="homepage_metrics/metrics.json"

raw_body = event.get("body")
body = None

if(isinstance(raw_body, str)):
body = json.loads(raw_body)
else:
body = event

metrics = get_metrics()

print(metrics)

write_metrics_to_s3(metrics, bucket, file_path)

return {
"statusCode": 200,
}

except json.JSONDecodeError as e:
return {'statusCode': 400, 'body': json.dumps({'error': f'Invalid JSON in request body: {str(e)}'})}
except Exception as e:
return {'statusCode': 500, 'body': json.dumps({'error': f'Internal server error: {str(e)}'})}


65 changes: 29 additions & 36 deletions data-engineering/src/saayam-org-aggregator/helpers.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,39 @@
import subprocess
import boto3
import sys
import json
from aws_lambda_powertools.utilities import parameters
import pandas as pd

subprocess.call([sys.executable, "-m", "pip", "install", "pg8000", "-t", "/tmp/"])
sys.path.insert(0, "/tmp/")
import pg8000


GEN_AI_LAMBDA = "More_Org_GenAI_Py_v3126"

# --- All cached at module level, initialized once on cold start ---
lambda_client = boto3.client('lambda')

_creds = json.loads(parameters.get_parameter(
'/dev/saayam/db/Virginia/Analytics/user',
decrypt=True,
max_age=3600
))
_db_name = _creds['DATABASE NAME']
_db_conn = pg8000.connect(
host=_creds['HOST'],
user=_creds['USERNAME'],
password=_creds['PASSWORD'],
database=_db_name,
port=_creds['PORT'],
ssl_context=True
)
# -----------------------------------------------------------------

def get_orgs_from_db(location, category):
try:
creds = json.loads(parameters.get_parameter(
'/dev/saayam/db/Virginia/Analytics/user',
decrypt=True,
max_age=3600
))
database = creds['DATABASE NAME']

conn = pg8000.connect(
host=creds['HOST'],
user=creds['USERNAME'],
password=creds['PASSWORD'],
database=database,
port=creds['PORT'],
ssl_context=True
)

df = pd.read_sql(
f"SELECT * FROM {database}.organizations WHERE mission = '{category}' AND city_name = '{location}'",
conn
f"SELECT * FROM {_db_name}.organizations WHERE mission = '{category}' AND city_name = '{location}'",
_db_conn
)
conn.close()
df["db_or_ai"] = "db"
return df

except parameters.GetParameterError as e:
raise Exception(f'Failed to retrieve DB credentials: {str(e)}')
except pg8000.DatabaseError as e:
raise Exception(f'Database error: {str(e)}')
except Exception as e:
Expand All @@ -46,7 +42,7 @@ def get_orgs_from_db(location, category):

def get_ai_orgs(subject, description, location):
try:
response = boto3.client('lambda').invoke(
response = lambda_client.invoke(
FunctionName=GEN_AI_LAMBDA,
InvocationType='RequestResponse',
Payload=json.dumps({
Expand All @@ -55,14 +51,12 @@ def get_ai_orgs(subject, description, location):
"location": location
})
)

payload = json.loads(response['Payload'].read())

if payload.get('statusCode') != 200:
raise Exception(f'GenAI Lambda returned error: {payload}')

return pd.DataFrame(payload['body']['organizations'])

orgs = pd.DataFrame(payload['body']['organizations'])
orgs["db_or_ai"] = "ai"
return orgs
except boto3.exceptions.Boto3Error as e:
raise Exception(f'Failed to invoke GenAI Lambda: {str(e)}')
except (KeyError, TypeError) as e:
Expand All @@ -77,14 +71,13 @@ def merge_organizations(db_organizations, genAI_organizations):
'org_name': 'name',
'city_name': 'location',
'phone': 'contact'
})[['name', 'location', 'contact', 'email', 'web_url', 'mission', 'source']]
})[['name', 'location', 'contact', 'email', 'web_url', 'mission', 'source', "db_or_ai"]]

genAI_organizations = genAI_organizations.rename(columns={
'organization_name': 'name'
})[['name', 'location', 'contact', 'email', 'web_url', 'mission', 'source']]
})[['name', 'location', 'contact', 'email', 'web_url', 'mission', 'source', "db_or_ai"]]

return pd.concat([db_organizations, genAI_organizations], ignore_index=True)

except KeyError as e:
raise Exception(f'Missing expected column during merge: {str(e)}')
except Exception as e:
Expand Down
24 changes: 19 additions & 5 deletions data-engineering/src/saayam-org-aggregator/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import json
from helpers import get_ai_orgs, get_orgs_from_db, merge_organizations
from concurrent.futures import ThreadPoolExecutor

VERSION = "1.0.1" # Test auto-deploy

# handle the lambda function call
def lambda_handler(event, context):
try:
raw_body = event.get("body")
Expand All @@ -19,13 +19,27 @@ def lambda_handler(event, context):
'body': json.dumps({'error': 'location and category are required fields'})
}

db_organizations = get_orgs_from_db(location, category)
genAI_organizations = get_ai_orgs(subject, description, location)
# db_organizations = get_orgs_from_db(location, category)
# genAI_organizations = get_ai_orgs(subject, description, location)

#Implemented parallelization to speed up the reponse

with ThreadPoolExecutor() as executor:
db_future = executor.submit(get_orgs_from_db, location, category)
ai_future = executor.submit(get_ai_orgs, subject, description, location)

db_organizations = db_future.result()
genAI_organizations = ai_future.result()

combined_list = merge_organizations(db_organizations, genAI_organizations)

return {
'statusCode': 200,
'body': combined_list.to_dict(orient='records')
'headers': {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": '*'
},
'body': json.dumps(combined_list.to_dict(orient='records'))
}

except json.JSONDecodeError as e:
Expand Down
Loading