1-
21import boto3
32import logging
43from botocore .exceptions import ClientError
54import pandas as pd
6-
7-
85import numpy as np
96import json
107import datetime
11-
128import io
139import os
1410
1511from gensim .models import Word2Vec
1612from sklearn .metrics .pairwise import cosine_similarity
1713from gensim import matutils
18-
19- from tqdm import tqdm
2014from dynamodb import *
2115
2216
23- #dev setting
17+ # environment variables for lambda
18+ file_name = os .environ ['FILE_NAME' ]
19+ file_name_origianl = os .environ ['FILE_NAME_ORIGINAL' ]
20+ bucket_name_nlp = os .environ ['BUCKET_NAME_NLP' ]
21+ bucket_name = os .environ ['BUCKET_NAME' ]
22+
23+ """
24+ #dev setting -- comment out for release
2425file_name = "Processed_records.parquet"
2526bucket_name_nlp = "nlp-data-preprocessing"
2627file_name_origianl = "records.parquet"
2728bucket_name = "webpresence-geocore-geojson-to-parquet-dev"
29+ """
2830
2931def lambda_handler (event , context ):
32+ """
3033 #Change directory to /tmp folder, this is required if new files are created for lambda
3134 os.chdir('/tmp') #This is important
3235 #Make a directory
3336 if not os.path.exists(os.path.join('mydir')):
3437 os.makedirs('mydir')
35-
38+ """
3639 # Read the preprocessed data from S3
37- df_en = open_S3_file_as_df (bucket_name_nlp , file_name )
40+ try :
41+ df_en = open_S3_file_as_df (bucket_name_nlp , file_name )
42+ except ClientError as e :
43+ print ('Accessing the S3 was failed on line 47 when calling df_en = open_S3_file_as_df(bucket_name_nlp, file_name)' )
44+ print (e .response ['Error' ]['Message' ])
3845
39- # Get a sample of 500 rows as the training data
40- df = df_en [['features_properties_id' , 'features_properties_title_en' , 'metadata_en_processed' ]]
41- #df = df.sample(n=500, random_state=1)
42- # Use all data to train the model
43- df .head ()
44- print (df .shape )
45-
46-
4746 # Use all data to train the model
4847 df = df_en [['features_properties_id' , 'features_properties_title_en' , 'features_properties_title_fr' ,'metadata_en_processed' ]]
4948 print (f'The shape of the preprocessed df is { df .shape } ' )
49+ # Replace the missing value in the 'features_properties_title_en' column with an empty string
50+ df ['features_properties_title_en' ].fillna ('' , inplace = True )
51+
5052
5153 # Prepare the input for the Word2Vec model
5254 sentences = df ['metadata_en_processed' ].apply (lambda x : x .split (' ' )).tolist ()
@@ -58,55 +60,21 @@ def lambda_handler(event, context):
5860
5961 # Convert each sentence in 'metadata_preprocessed' into a vector
6062 vectors = df ['metadata_en_processed' ].apply (sentence_to_vector , model = model )
61- # Replace the missing value in the 'features_properties_title_en' column with an empty string
62- df ['features_properties_title_en' ].fillna ('' , inplace = True )
63+
6364
6465 # Calculate similarity between each vector and all others
6566 similarity_matrix = cosine_similarity (np .array (vectors .tolist ()))
6667
67-
68- # Initialize new columns for the top 5 similar texts
69- df ['sim1' ], df ['sim2' ], df ['sim3' ], df ['sim4' ], df ['sim5' ] = "" , "" , "" , "" , ""
70-
71- # For each text, find the top 5 most similar texts and append their 'features_properties_title_en' as new columns
72- df .reset_index (drop = True , inplace = True )
73- for i in tqdm (range (similarity_matrix .shape [0 ])):
74- top_5_similar = np .argsort (- similarity_matrix [i , :])[1 :6 ] # Exclude the text itself
75- df .loc [i , ['sim1' , 'sim2' , 'sim3' , 'sim4' , 'sim5' ]] = df .loc [top_5_similar , 'features_properties_title_en' ].values
76-
77- # Read the original parquet file and merge by features_properties_id
78- df_original = open_S3_file_as_df (bucket_name , file_name_origianl )
79- merged_df = df_original .merge (df [['features_properties_id' , 'sim1' , 'sim2' , 'sim3' , 'sim4' , 'sim5' ]], on = 'features_properties_id' , how = 'left' )
80- """ Option 1: merge the similar results with records.parquet directly
81- # Initialize new columns for the top 10 similar texts
82- df['sim1'], df['sim2'], df['sim3'], df['sim4'], df['sim5'],df['sim6'], df['sim7'], df['sim8'], df['sim9'], df['sim10'] = "", "", "", "", "","", "", "", "", ""
8368
84- # For each text, find the top 10 most similar texts and append their 'features_properties_title_en' as new columns
85- df.reset_index(drop=True, inplace=True)
86- for i in tqdm(range(similarity_matrix.shape[0])):
87- top_10_similar = np.argsort(-similarity_matrix[i, :])[1:11] # Exclude the text itself
88- df.loc[i, ['sim1', 'sim2', 'sim3', 'sim4', 'sim5','sim6', 'sim7', 'sim8', 'sim9', 'sim10']] = df.loc[top_10_similar, 'features_properties_id'].values
89-
90- # Read the original parquet file and merge by features_properties_id
91- df_original = open_S3_file_as_df(bucket_name, file_name_origianl)
92- merged_df = df_original.merge(df[['features_properties_id', 'sim1', 'sim2', 'sim3', 'sim4', 'sim5','sim6', 'sim7', 'sim8', 'sim9', 'sim10']],
93- on='features_properties_id', how='left')
94- print(f'the shape of original parquet file is {df_original.shape}')
95-
96- # Save to temp folder, see https://iotespresso.com/temporary-storage-during-aws-lambda-runtime-python/
97- save_path = os.path.join(os.getcwd(), 'mydir', 'merged_df')
98- merged_df.to_csv(save_path)
99- df_fetched= pd.read_csv(save_path)
100- print(f'the shape of merged parquet file is {merged_df.shape}')
101- # upload merged dataframe to S3
102- upload_dataframe_to_s3_as_parquet(df=df_fetched, bucket_name=bucket_name_nlp, file_key='sim_word2vec_records.parquet')
103- """
69+ # Upload the similar results as a AWS dynamodb
70+ """
71+ The parquet lambda function has been modified to merge the similairy table with records.parquet everytime when records.parquet is updated.
10472
105- #Option 2: upload the similar results as a dynamodb, and merge the tabke with records.parquet everytime when records.parquet is updated
73+ """
10674 df ['similarity' ] = np .nan # Initialize the column
10775 # For each text, find the top 10 most similar texts and save them as a JSON array object in the 'similarity' column
10876 df .reset_index (drop = True , inplace = True )
109- for i in tqdm ( range (similarity_matrix .shape [0 ]) ):
77+ for i in range (similarity_matrix .shape [0 ]):
11078 top_10_similar = np .argsort (- similarity_matrix [i , :])[1 :11 ] # Exclude the text itself
11179 sim_array = []
11280 for j , idx in enumerate (top_10_similar ):
@@ -137,7 +105,6 @@ def lambda_handler(event, context):
137105 delete_table (TableName = 'similarity' )
138106 waiter = client .get_waiter ('table_not_exists' )
139107 waiter .wait (TableName = 'similarity' )
140- print ('Before create' )
141108 except ClientError as e :
142109 print (e )
143110 #Create table
@@ -148,8 +115,20 @@ def lambda_handler(event, context):
148115 waiter .wait (TableName = 'similarity' )
149116 except ClientError as e :
150117 print (e )
118+
119+ """DEBUG
120+ #Check if empty string in the primary key before scan the table
121+ empty_string_rows = df[df['features_properties_id'] == '']
122+ print(f'Number of NA values in the df id column is \n {empty_string_rows}')
123+ """
124+ #Remove rows with empty string in 'features_properties_id', primary key can not be empty in DynamoDB table
125+ df_cleaned = df [df ['features_properties_id' ]!= '' ]
126+ rows_removed = df .shape [0 ] - df_cleaned .shape [0 ]
127+ print (f'Removed { rows_removed } rows with empyt string in features_properties_id' )
151128 #Batch write to table
152- batch_write_items_into_table (df , TableName = 'similarity' )
129+ batch_write_items_into_table (df_cleaned , TableName = 'similarity' )
130+
131+
153132
154133# Function to read the parquet file as pandas dataframe
155134def open_S3_file_as_df (bucket_name , file_name ):
0 commit comments