Skip to content

Commit a62c5db

Browse files
committed
make changes for integration
1 parent a5ef5c4 commit a62c5db

5 files changed

Lines changed: 48 additions & 41 deletions

File tree

data-engineering/src/saayam-org-aggregator/__init__.py renamed to data-engineering/src/aggregate-daily-metrics/__init__.py

File renamed without changes.

data-engineering/src/aggregate-daily-metrics/helpers.py

Whitespace-only changes.

data-engineering/src/aggregate-daily-metrics/lambda_function.py

Whitespace-only changes.

data-engineering/src/saayam-org-aggregator/helpers.py

Lines changed: 29 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,39 @@
1-
import subprocess
21
import boto3
3-
import sys
42
import json
53
from aws_lambda_powertools.utilities import parameters
64
import pandas as pd
7-
8-
subprocess.call([sys.executable, "-m", "pip", "install", "pg8000", "-t", "/tmp/"])
9-
sys.path.insert(0, "/tmp/")
105
import pg8000
116

7+
128
GEN_AI_LAMBDA = "More_Org_GenAI_Py_v3126"
139

10+
# --- All cached at module level, initialized once on cold start ---
11+
lambda_client = boto3.client('lambda')
12+
13+
_creds = json.loads(parameters.get_parameter(
14+
'/dev/saayam/db/Virginia/Analytics/user',
15+
decrypt=True,
16+
max_age=3600
17+
))
18+
_db_name = _creds['DATABASE NAME']
19+
_db_conn = pg8000.connect(
20+
host=_creds['HOST'],
21+
user=_creds['USERNAME'],
22+
password=_creds['PASSWORD'],
23+
database=_db_name,
24+
port=_creds['PORT'],
25+
ssl_context=True
26+
)
27+
# -----------------------------------------------------------------
28+
1429
def get_orgs_from_db(location, category):
1530
try:
16-
creds = json.loads(parameters.get_parameter(
17-
'/dev/saayam/db/Virginia/Analytics/user',
18-
decrypt=True,
19-
max_age=3600
20-
))
21-
database = creds['DATABASE NAME']
22-
23-
conn = pg8000.connect(
24-
host=creds['HOST'],
25-
user=creds['USERNAME'],
26-
password=creds['PASSWORD'],
27-
database=database,
28-
port=creds['PORT'],
29-
ssl_context=True
30-
)
31-
3231
df = pd.read_sql(
33-
f"SELECT * FROM {database}.organizations WHERE mission = '{category}' AND city_name = '{location}'",
34-
conn
32+
f"SELECT * FROM {_db_name}.organizations WHERE mission = '{category}' AND city_name = '{location}'",
33+
_db_conn
3534
)
36-
conn.close()
35+
df["db_or_ai"] = "db"
3736
return df
38-
39-
except parameters.GetParameterError as e:
40-
raise Exception(f'Failed to retrieve DB credentials: {str(e)}')
4137
except pg8000.DatabaseError as e:
4238
raise Exception(f'Database error: {str(e)}')
4339
except Exception as e:
@@ -46,7 +42,7 @@ def get_orgs_from_db(location, category):
4642

4743
def get_ai_orgs(subject, description, location):
4844
try:
49-
response = boto3.client('lambda').invoke(
45+
response = lambda_client.invoke(
5046
FunctionName=GEN_AI_LAMBDA,
5147
InvocationType='RequestResponse',
5248
Payload=json.dumps({
@@ -55,14 +51,12 @@ def get_ai_orgs(subject, description, location):
5551
"location": location
5652
})
5753
)
58-
5954
payload = json.loads(response['Payload'].read())
60-
6155
if payload.get('statusCode') != 200:
6256
raise Exception(f'GenAI Lambda returned error: {payload}')
63-
64-
return pd.DataFrame(payload['body']['organizations'])
65-
57+
orgs = pd.DataFrame(payload['body']['organizations'])
58+
orgs["db_or_ai"] = "ai"
59+
return orgs
6660
except boto3.exceptions.Boto3Error as e:
6761
raise Exception(f'Failed to invoke GenAI Lambda: {str(e)}')
6862
except (KeyError, TypeError) as e:
@@ -77,14 +71,13 @@ def merge_organizations(db_organizations, genAI_organizations):
7771
'org_name': 'name',
7872
'city_name': 'location',
7973
'phone': 'contact'
80-
})[['name', 'location', 'contact', 'email', 'web_url', 'mission', 'source']]
74+
})[['name', 'location', 'contact', 'email', 'web_url', 'mission', 'source', "db_or_ai"]]
8175

8276
genAI_organizations = genAI_organizations.rename(columns={
8377
'organization_name': 'name'
84-
})[['name', 'location', 'contact', 'email', 'web_url', 'mission', 'source']]
78+
})[['name', 'location', 'contact', 'email', 'web_url', 'mission', 'source', "db_or_ai"]]
8579

8680
return pd.concat([db_organizations, genAI_organizations], ignore_index=True)
87-
8881
except KeyError as e:
8982
raise Exception(f'Missing expected column during merge: {str(e)}')
9083
except Exception as e:

data-engineering/src/saayam-org-aggregator/lambda_function.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import json
22
from helpers import get_ai_orgs, get_orgs_from_db, merge_organizations
3+
from concurrent.futures import ThreadPoolExecutor
34

4-
VERSION = "1.0.1" # Test auto-deploy
5-
5+
# handle the lambda function call
66
def lambda_handler(event, context):
77
try:
88
raw_body = event.get("body")
@@ -19,13 +19,27 @@ def lambda_handler(event, context):
1919
'body': json.dumps({'error': 'location and category are required fields'})
2020
}
2121

22-
db_organizations = get_orgs_from_db(location, category)
23-
genAI_organizations = get_ai_orgs(subject, description, location)
22+
# db_organizations = get_orgs_from_db(location, category)
23+
# genAI_organizations = get_ai_orgs(subject, description, location)
24+
25+
#Implemented parallelization to speed up the reponse
26+
27+
with ThreadPoolExecutor() as executor:
28+
db_future = executor.submit(get_orgs_from_db, location, category)
29+
ai_future = executor.submit(get_ai_orgs, subject, description, location)
30+
31+
db_organizations = db_future.result()
32+
genAI_organizations = ai_future.result()
33+
2434
combined_list = merge_organizations(db_organizations, genAI_organizations)
2535

2636
return {
2737
'statusCode': 200,
28-
'body': combined_list.to_dict(orient='records')
38+
'headers': {
39+
"Content-Type": "application/json",
40+
"Access-Control-Allow-Origin": '*'
41+
},
42+
'body': json.dumps(combined_list.to_dict(orient='records'))
2943
}
3044

3145
except json.JSONDecodeError as e:

0 commit comments

Comments
 (0)