Skip to content

Commit d797e35

Browse files
authored
Merge pull request #577 from ddps-lab/aws/manual-rawdata-merge
수동 rawData merge Code 추가
2 parents cf2cb36 + 5d60f5d commit d797e35

4 files changed

Lines changed: 420 additions & 0 deletions

File tree

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# ------ import module ------
2+
import pandas as pd
3+
import numpy as np
4+
5+
# ------ import user module ------
6+
from slack_msg_sender import send_slack_message
7+
8+
# compare previous collected workload with current collected workload
9+
# return changed workload
10+
11+
12+
def compare(previous_df, current_df, workload_cols, feature_cols):
13+
previous_df.loc[:, "Workload"] = previous_df[workload_cols].apply(lambda row: ":".join(row.values.astype(str)), axis=1)
14+
previous_df.loc[:, "Feature"] = previous_df[feature_cols].apply(lambda row: ":".join(row.values.astype(str)), axis=1)
15+
current_df.loc[:, "Workload"] = current_df[workload_cols].apply(lambda row: ":".join(row.values.astype(str)), axis=1)
16+
current_df.loc[:, "Feature"] = current_df[feature_cols].apply(lambda row: ":".join(row.values.astype(str)), axis=1)
17+
18+
current_indices = current_df[["Workload", "Feature"]].sort_values(by="Workload").index
19+
current_values = current_df[["Workload", "Feature"]].sort_values(by="Workload").values
20+
previous_indices = previous_df[["Workload", "Feature"]].sort_values(by="Workload").index
21+
previous_values = previous_df[["Workload", "Feature"]].sort_values(by="Workload").values
22+
23+
changed_indices = []
24+
removed_indices = []
25+
26+
prev_idx = 0
27+
curr_idx = 0
28+
while True:
29+
if (curr_idx == len(current_indices)) and (prev_idx == len(previous_indices)):
30+
break
31+
elif curr_idx == len(current_indices):
32+
prev_workload = previous_values[prev_idx][0]
33+
if prev_workload not in current_values[:, 0]:
34+
removed_indices.append(previous_indices[prev_idx])
35+
prev_idx += 1
36+
continue
37+
else:
38+
send_slack_message(f"{prev_workload}, {curr_workload} workload error")
39+
print(f"{prev_workload}, {curr_workload} workload error")
40+
raise Exception("workload error")
41+
break
42+
elif prev_idx == len(previous_indices):
43+
curr_workload = current_values[curr_idx][0]
44+
curr_feature = current_values[curr_idx][1]
45+
if curr_workload not in previous_values[:, 0]:
46+
changed_indices.append(current_indices[curr_idx])
47+
curr_idx += 1
48+
continue
49+
else:
50+
send_slack_message(f"{prev_workload}, {curr_workload} workload error")
51+
print(f"{prev_workload}, {curr_workload} workload error")
52+
raise Exception("workload error")
53+
break
54+
55+
prev_workload = previous_values[prev_idx][0]
56+
prev_feature = previous_values[prev_idx][1]
57+
curr_workload = current_values[curr_idx][0]
58+
curr_feature = current_values[curr_idx][1]
59+
60+
if prev_workload != curr_workload:
61+
if curr_workload not in previous_values[:, 0]:
62+
changed_indices.append(current_indices[curr_idx])
63+
curr_idx += 1
64+
elif prev_workload not in current_values[:, 0]:
65+
removed_indices.append(previous_indices[prev_idx])
66+
prev_idx += 1
67+
continue
68+
else:
69+
send_slack_message(f"{prev_workload}, {curr_workload} workload error")
70+
print(f"{prev_workload}, {curr_workload} workload error")
71+
raise Exception("workload error")
72+
else:
73+
if prev_feature != curr_feature:
74+
changed_indices.append(current_indices[curr_idx])
75+
curr_idx += 1
76+
prev_idx += 1
77+
changed_df = current_df.loc[changed_indices].drop(["Workload", "Feature"], axis=1)
78+
removed_df = previous_df.loc[removed_indices].drop(["Workload", "Feature"], axis=1)
79+
80+
for col in feature_cols:
81+
removed_df[col] = 0
82+
83+
# removed_df have one more column, "Ceased"
84+
removed_df["Ceased"] = True
85+
86+
return changed_df, removed_df
87+
88+
# ------ Compare the values of T3 and T2 ------
89+
90+
91+
def compare_max_instance(previous_df, new_df, target_capacity):
92+
fallback_dict = {50: 45, 45: 40, 40: 35, 35: 30, 30: 25, 25: 20, 20: 15, 15: 10, 10: 5, 5: 1, 1: 0}
93+
fallback_val = fallback_dict.get(target_capacity, 0)
94+
95+
spotlake_df = new_df.copy()
96+
97+
merged_df = pd.merge(
98+
spotlake_df,
99+
previous_df[["InstanceType", "AZ", "SPS", "T3", "T2"]],
100+
on=["InstanceType", "AZ"],
101+
how="left",
102+
suffixes=("", "_prev")
103+
)
104+
105+
# Fix SPS when single node SPS
106+
if target_capacity == 1:
107+
merged_df["SPS"] = merged_df["SPS"].combine_first(merged_df["SPS_prev"])
108+
109+
# Merge single node SPS with multi node SPS if (multi node SPS) > (single node SPS)
110+
merged_df.loc[(merged_df["SPS"] > merged_df["SPS_prev"]), "SPS_prev"] = merged_df["SPS"]
111+
112+
# Calculate T3
113+
merged_df["T3"] = np.where(
114+
merged_df["SPS"] >= 3,
115+
np.maximum(merged_df["T3"], merged_df["T3_prev"]),
116+
np.minimum(fallback_val, merged_df["T3_prev"])
117+
)
118+
119+
# Calculate T2
120+
merged_df["T2"] = np.where(
121+
merged_df["SPS"] >= 2,
122+
np.maximum(merged_df["T2"], merged_df["T2_prev"]),
123+
np.minimum(fallback_val, merged_df["T2_prev"])
124+
)
125+
126+
if target_capacity == 1:
127+
# When SPS lower than condition, set T3 or T2 to 0
128+
merged_df.loc[merged_df["SPS"] <= 2, "T3"] = 0
129+
merged_df.loc[merged_df["SPS"] < 2, "T2"] = 0
130+
else:
131+
# When SPS lower than condition, set T3 or T2 to 0
132+
merged_df.loc[merged_df["SPS_prev"] <= 2, "T3"] = 0
133+
merged_df.loc[merged_df["SPS_prev"] < 2, "T2"] = 0
134+
# Fix SPS to Single node SPS
135+
merged_df["SPS"] = merged_df["SPS_prev"]
136+
137+
# Convert to int
138+
for col in ["SPS", "T2", "T3"]:
139+
merged_df[col] = merged_df[col].astype("Int64")
140+
141+
# Drop unnecessary columns
142+
merged_df.drop(columns=["T3_prev", "T2_prev", "SPS_prev"], inplace=True)
143+
144+
return merged_df
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
# ------ import module ------
2+
from datetime import datetime, timezone, timedelta
3+
import boto3
4+
import pickle, gzip, json
5+
import pandas as pd
6+
import numpy as np
7+
import os
8+
import sys
9+
10+
# ------ import user module ------
11+
sys.path.append("/home/ubuntu/spotlake/utility")
12+
from slack_msg_sender import send_slack_message
13+
from upload_data import upload_timestream, update_latest, save_raw, update_query_selector, update_config
14+
from compare_data import compare, compare_max_instance
15+
16+
def process_timestamp(TIMESTAMP, BUCKET_NAME, BUCKET_FILE_PATH):
17+
S3_DIR_NAME = TIMESTAMP.strftime('%Y/%m/%d')
18+
S3_OBJECT_PREFIX = TIMESTAMP.strftime('%H-%M')
19+
20+
SPS_FILE_PREFIX = f"{BUCKET_FILE_PATH}/sps/{S3_DIR_NAME}"
21+
SPOTIF_FILE_NAME = f"{BUCKET_FILE_PATH}/spot_if/{S3_DIR_NAME}/{S3_OBJECT_PREFIX}_spot_if.pkl.gz"
22+
ONDEMAND_PRICE_FILE_NAME = f"{BUCKET_FILE_PATH}/ondemand_price/{S3_DIR_NAME}/ondemand_price.pkl.gz"
23+
SPOTPRICE_FILE_NAME = f"{BUCKET_FILE_PATH}/spot_price/{S3_DIR_NAME}/{S3_OBJECT_PREFIX}_spot_price.pkl.gz"
24+
25+
# ------ Set time data ------
26+
time_value = TIMESTAMP.strftime("%Y-%m-%d %H:%M:%S")
27+
print(f"Processing timestamp: {time_value}")
28+
try:
29+
start_time = datetime.now(timezone.utc)
30+
# ------ Create Boto3 Session ------
31+
s3 = boto3.resource("s3")
32+
s3_client = boto3.client('s3')
33+
34+
# ------ Find Sps File in S3 ------
35+
sps_file_list = s3_client.list_objects_v2(Bucket=BUCKET_NAME, Prefix=SPS_FILE_PREFIX)
36+
sps_files = []
37+
for obj in sps_file_list['Contents']:
38+
if obj['Key'].startswith(f"{SPS_FILE_PREFIX}/{S3_OBJECT_PREFIX}"):
39+
sps_files.append(obj['Key'])
40+
41+
sps_file_name = sps_files[0]
42+
print(sps_file_name)
43+
target_capacity = int(sps_file_name.split('/')[-1].split('_')[2].split('.')[0])
44+
45+
# ------ Load Data from PKL File in S3 ------
46+
sps_df = pickle.load(gzip.open(s3.Object(BUCKET_NAME, sps_file_name).get()["Body"]))
47+
spotinfo_df = pickle.load(gzip.open(s3.Object(BUCKET_NAME, SPOTIF_FILE_NAME.strip()).get()["Body"]))
48+
ondemand_price_df = pickle.load(gzip.open(s3.Object(BUCKET_NAME, ONDEMAND_PRICE_FILE_NAME.strip()).get()["Body"]))
49+
spot_price_df = pickle.load(gzip.open(s3.Object(BUCKET_NAME, SPOTPRICE_FILE_NAME.strip()).get()["Body"]))
50+
51+
# ------ Create a DF by Selecting Only The Columns Required ------
52+
sps_df = sps_df[['InstanceType', 'Region', 'AZ', 'SPS', 'T3', 'T2']]
53+
spotinfo_df = spotinfo_df[['InstanceType', 'Region', 'IF']]
54+
ondemand_price_df = ondemand_price_df[['InstanceType', 'Region', 'OndemandPrice']]
55+
spot_price_df = spot_price_df[['InstanceType', 'AZ', 'SpotPrice']]
56+
57+
# ------ Formatting Data ------
58+
spot_price_df['SpotPrice'] = spot_price_df['SpotPrice'].astype('float').round(5)
59+
ondemand_price_df['OndemandPrice'] = ondemand_price_df['OndemandPrice'].astype('float').round(5)
60+
61+
# ------ Need to Change to Outer Join ------
62+
merge_df = pd.merge(sps_df, spotinfo_df, how="outer")
63+
merge_df = pd.merge(merge_df, ondemand_price_df, how="outer")
64+
merge_df = pd.merge(merge_df, spot_price_df, how="outer")
65+
66+
merge_df['Savings'] = 100.0 - (merge_df['SpotPrice'] * 100 / merge_df['OndemandPrice'])
67+
merge_df['Savings'] = merge_df['Savings'].fillna(-1)
68+
merge_df['SPS'] = merge_df['SPS'].fillna(-1)
69+
merge_df['SpotPrice'] = merge_df['SpotPrice'].fillna(-1)
70+
merge_df['OndemandPrice'] = merge_df['OndemandPrice'].fillna(-1)
71+
merge_df['IF'] = merge_df['IF'].fillna(-1)
72+
73+
merge_df['Savings'] = merge_df['Savings'].astype('int')
74+
merge_df['SPS'] = merge_df['SPS'].astype('int')
75+
merge_df['T3'] = merge_df['T3'].fillna(0).astype('int')
76+
merge_df['T2'] = merge_df['T2'].fillna(0).astype('int')
77+
78+
merge_df = merge_df.drop(merge_df[(merge_df['AZ'].isna()) | (merge_df['Region'].isna()) | (merge_df['InstanceType'].isna())].index)
79+
80+
merge_df.reset_index(drop=True, inplace=True)
81+
merge_df['Time'] = time_value
82+
83+
end_time = datetime.now(timezone.utc)
84+
print(f"Merging time is {(end_time - start_time).total_seconds() * 1000 / 60000:.2f} min")
85+
86+
# ------ Check The Previous DF File in S3 and Local ------
87+
previous_df = None
88+
start_time = datetime.now(timezone.utc)
89+
filename = '/home/ubuntu/spotlake/utility/manual_merge_aws_rawdata/latest_aws.json'
90+
91+
previous_df = pd.DataFrame(json.load(open(filename, 'r')))
92+
93+
previous_df = previous_df.drop(columns=['id'])
94+
print(previous_df)
95+
96+
end_time = datetime.now(timezone.utc)
97+
print(f"Checking time of previous json file is {(end_time - start_time).total_seconds() * 1000 / 60000:.2f} min")
98+
99+
start_time = datetime.now(timezone.utc)
100+
101+
# ------ Compare T3 and T2 Data ------
102+
current_df = compare_max_instance(previous_df, merge_df, target_capacity)
103+
104+
# # ------ Upload Merge DF to s3 Bucket ------
105+
update_latest(current_df, TIMESTAMP)
106+
save_raw(current_df, TIMESTAMP)
107+
108+
# ------ Compare All Data ------
109+
workload_cols = ['InstanceType', 'Region', 'AZ']
110+
feature_cols = ['SPS', 'T3', 'T2', 'IF', 'SpotPrice', 'OndemandPrice']
111+
112+
changed_df, removed_df = compare(previous_df, current_df, workload_cols, feature_cols) # compare previous_df and current_df to extract changed rows)
113+
end_time = datetime.now(timezone.utc)
114+
print(f"Compare time is {(end_time - start_time).total_seconds() * 1000 / 60000:.2f} min")
115+
116+
# # ------ Upload TSDB ------
117+
# start_time = datetime.now(timezone.utc)
118+
# upload_timestream(changed_df, TIMESTAMP)
119+
# upload_timestream(removed_df, TIMESTAMP)
120+
# end_time = datetime.now(timezone.utc)
121+
# print(f"Uploading time to TSDB is {(end_time - start_time).total_seconds() * 1000 / 60000:.2f} min")
122+
123+
except Exception as e:
124+
send_slack_message(e)
125+
print(e)
126+
127+
def main():
128+
print("Start Lambda Function")
129+
send_slack_message("수동 데이터 CSV 병합이 시작되었습니다!")
130+
start_time = datetime.now(timezone.utc)
131+
132+
# ------ Set Constants ------
133+
BUCKET_NAME = "spotlake"
134+
BUCKET_FILE_PATH = "rawdata/aws"
135+
136+
START_DATE = datetime(2025, 2, 15, 0, 10, 0, tzinfo=timezone.utc)
137+
END_DATE = datetime(2025, 4, 4, 0, 0, 0, tzinfo=timezone.utc)
138+
139+
current_time = START_DATE
140+
while current_time <= END_DATE:
141+
TIMESTAMP = current_time.replace(minute=((current_time.minute // 10) * 10), second=0) - timedelta(minutes=10)
142+
process_timestamp(TIMESTAMP, BUCKET_NAME, BUCKET_FILE_PATH)
143+
current_time += timedelta(minutes=10)
144+
145+
end_time = datetime.now(timezone.utc)
146+
print(f"Total running time is {(end_time - start_time).total_seconds() * 1000 / 60000:.2f} min")
147+
148+
def lambda_handler(event, context):
149+
start_time = datetime.now(timezone.utc)
150+
print("Lambda handler invoked")
151+
main()
152+
end_time = datetime.now(timezone.utc)
153+
print(f"Running time is {(end_time - start_time).total_seconds() * 1000 / 60000:.2f} min")
154+
return "Process completed successfully"
155+
156+
if __name__ == "__main__":
157+
lambda_handler(None, None)
158+
send_slack_message("수동 데이터 CSV 병합이 완료되었습니다!")

utility/manual_merge_aws_rawdata/latest_aws.json

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)