11# ------ import module ------
22from datetime import datetime , timezone , timedelta
33import boto3
4- import pickle , gzip , json
4+ import pickle
5+ import gzip
6+ import json
57import pandas as pd
68import numpy as np
79import os
1113from upload_data import upload_timestream , update_latest , save_raw , update_query_selector , update_config
1214from compare_data import compare , compare_max_instance
1315
16+
1417class FirstRunError (Exception ):
1518 pass
1619
20+
1721def main ():
1822 print ("Start Lambda Function" )
1923 start_time = datetime .now (timezone .utc )
@@ -25,15 +29,15 @@ def main():
2529 TIMESTAMP = start_time .replace (minute = ((start_time .minute // 10 ) * 10 ), second = 0 ) - timedelta (minutes = 10 )
2630 S3_DIR_NAME = TIMESTAMP .strftime ('%Y/%m/%d' )
2731 S3_OBJECT_PREFIX = TIMESTAMP .strftime ('%H-%M' )
28-
32+
2933 SPS_FILE_PREFIX = f"{ BUCKET_FILE_PATH } /sps/{ S3_DIR_NAME } "
3034 SPOTIF_FILE_NAME = f"{ BUCKET_FILE_PATH } /spot_if/{ S3_DIR_NAME } /{ S3_OBJECT_PREFIX } _spot_if.pkl.gz"
3135 ONDEMAND_PRICE_FILE_NAME = f"{ BUCKET_FILE_PATH } /ondemand_price/{ S3_DIR_NAME } /ondemand_price.pkl.gz"
3236 SPOTPRICE_FILE_NAME = f"{ BUCKET_FILE_PATH } /spot_price/{ S3_DIR_NAME } /{ S3_OBJECT_PREFIX } _spot_price.pkl.gz"
3337
3438 # ------ Set time data ------
3539 time_value = TIMESTAMP .strftime ("%Y-%m-%d %H:%M:%S" )
36-
40+
3741 try :
3842 # ------ Create Boto3 Session ------
3943 s3 = boto3 .resource ("s3" )
@@ -61,7 +65,7 @@ def main():
6165 spotinfo_df = spotinfo_df [['InstanceType' , 'Region' , 'IF' ]]
6266 ondemand_price_df = ondemand_price_df [['InstanceType' , 'Region' , 'OndemandPrice' ]]
6367 spot_price_df = spot_price_df [['InstanceType' , 'AZ' , 'SpotPrice' ]]
64-
68+
6569 # ------ Formatting Data ------
6670 spot_price_df ['SpotPrice' ] = spot_price_df ['SpotPrice' ].astype ('float' ).round (5 )
6771 ondemand_price_df ['OndemandPrice' ] = ondemand_price_df ['OndemandPrice' ].astype ('float' ).round (5 )
@@ -84,7 +88,7 @@ def main():
8488 merge_df ['T2' ] = merge_df ['T2' ].fillna (0 ).astype ('int' )
8589
8690 merge_df = merge_df .drop (merge_df [(merge_df ['AZ' ].isna ()) | (merge_df ['Region' ].isna ()) | (merge_df ['InstanceType' ].isna ())].index )
87-
91+
8892 merge_df .reset_index (drop = True , inplace = True )
8993 merge_df ['Time' ] = time_value
9094
@@ -101,11 +105,11 @@ def main():
101105 # Verify that the data is in the old format
102106 columns_to_check = ["T3" , "T2" ]
103107 existing_columns = [col for col in columns_to_check if col in previous_df .columns ]
104-
108+
105109 if len (existing_columns ) == 0 :
106110 raise FirstRunError ("Can't load the previous df from s3 bucket or First run since changing the collector" )
107111 else :
108- previous_df = previous_df .drop (columns = ['Id ' ])
112+ previous_df = previous_df .drop (columns = ['id ' ])
109113 except FirstRunError as e :
110114 # If system is first time uploading data, make a new one and upload it to TSDB
111115 update_latest (merge_df , TIMESTAMP )
@@ -117,21 +121,21 @@ def main():
117121
118122 end_time = datetime .now (timezone .utc )
119123 print (f"Checking time of previous json file is { (end_time - start_time ).total_seconds () * 1000 / 60000 :.2f} min" )
120-
124+
121125 start_time = datetime .now (timezone .utc )
122-
126+
123127 # ------ Compare T3 and T2 Data ------
124128 current_df = compare_max_instance (previous_df , merge_df , target_capacity )
125129
126130 # ------ Upload Merge DF to s3 Bucket ------
127131 update_latest (current_df , TIMESTAMP )
128132 save_raw (current_df , TIMESTAMP )
129-
133+
130134 # ------ Compare All Data ------
131135 workload_cols = ['InstanceType' , 'Region' , 'AZ' ]
132136 feature_cols = ['SPS' , 'T3' , 'T2' , 'IF' , 'SpotPrice' , 'OndemandPrice' ]
133137
134- changed_df , removed_df = compare (previous_df , current_df , workload_cols , feature_cols ) # compare previous_df and current_df to extract changed rows)
138+ changed_df , removed_df = compare (previous_df , current_df , workload_cols , feature_cols ) # compare previous_df and current_df to extract changed rows)
135139 end_time = datetime .now (timezone .utc )
136140 print (f"Compare time is { (end_time - start_time ).total_seconds () * 1000 / 60000 :.2f} min" )
137141
@@ -152,13 +156,14 @@ def main():
152156 print (e )
153157 raise
154158
159+
155160def lambda_handler (event , context ):
156161 start_time = datetime .now (timezone .utc )
157162 main ()
158163 end_time = datetime .now (timezone .utc )
159164 print (f"Running time is { (end_time - start_time ).total_seconds () * 1000 / 60000 :.2f} min" )
160165 return "Process completed successfully"
161166
167+
162168if __name__ == "__main__" :
163169 lambda_handler ({}, {})
164-
0 commit comments