Skip to content

Commit 9541705

Browse files
authored
Merge pull request #576 from ddps-lab/aws/t3-fix
신규 AWS Collector 전환 이후 T2, T3 저장 오류 수정
2 parents 0c07eb0 + da5438e commit 9541705

4 files changed

Lines changed: 78 additions & 49 deletions

File tree

collector/spot-dataset/aws/ec2/sps/sps_query_api.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,16 @@ def query_sps(args):
7070
sps_dict["AZ"].append(score['AvailabilityZoneId'])
7171
sps_dict["SPS"].append(int(score["Score"]))
7272
sps_dict["TargetCapacity"].append(target_capacity)
73+
7374
if score['Score'] == 3:
7475
sps_dict["T3"].append(target_capacity)
7576
else:
7677
sps_dict["T3"].append(0)
77-
if score['Score'] == 1:
78-
sps_dict["T2"].append(0)
79-
else:
78+
79+
if score['Score'] == 2:
8080
sps_dict["T2"].append(target_capacity)
81+
else:
82+
sps_dict["T2"].append(0)
8183

8284
return pd.DataFrame(sps_dict)
8385

collector/spot-dataset/aws/lambda/post_processing_data/compare_data.py

Lines changed: 62 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88
# compare previous collected workload with current collected workload
99
# return changed workload
1010
def compare(previous_df, current_df, workload_cols, feature_cols):
11-
previous_df.loc[:,'Workload'] = previous_df[workload_cols].apply(lambda row: ':'.join(row.values.astype(str)), axis=1)
12-
previous_df.loc[:,'Feature'] = previous_df[feature_cols].apply(lambda row: ':'.join(row.values.astype(str)), axis=1)
13-
current_df.loc[:,'Workload'] = current_df[workload_cols].apply(lambda row: ':'.join(row.values.astype(str)), axis=1)
14-
current_df.loc[:,'Feature'] = current_df[feature_cols].apply(lambda row: ':'.join(row.values.astype(str)), axis=1)
11+
previous_df.loc[:,"Workload"] = previous_df[workload_cols].apply(lambda row: ":".join(row.values.astype(str)), axis=1)
12+
previous_df.loc[:,"Feature"] = previous_df[feature_cols].apply(lambda row: ":".join(row.values.astype(str)), axis=1)
13+
current_df.loc[:,"Workload"] = current_df[workload_cols].apply(lambda row: ":".join(row.values.astype(str)), axis=1)
14+
current_df.loc[:,"Feature"] = current_df[feature_cols].apply(lambda row: ":".join(row.values.astype(str)), axis=1)
1515

16-
current_indices = current_df[['Workload', 'Feature']].sort_values(by='Workload').index
17-
current_values = current_df[['Workload', 'Feature']].sort_values(by='Workload').values
18-
previous_indices = previous_df[['Workload', 'Feature']].sort_values(by='Workload').index
19-
previous_values = previous_df[['Workload', 'Feature']].sort_values(by='Workload').values
16+
current_indices = current_df[["Workload", "Feature"]].sort_values(by="Workload").index
17+
current_values = current_df[["Workload", "Feature"]].sort_values(by="Workload").values
18+
previous_indices = previous_df[["Workload", "Feature"]].sort_values(by="Workload").index
19+
previous_values = previous_df[["Workload", "Feature"]].sort_values(by="Workload").values
2020

2121
changed_indices = []
2222
removed_indices = []
@@ -35,7 +35,7 @@ def compare(previous_df, current_df, workload_cols, feature_cols):
3535
else:
3636
send_slack_message(f"{prev_workload}, {curr_workload} workload error")
3737
print(f"{prev_workload}, {curr_workload} workload error")
38-
raise Exception('workload error')
38+
raise Exception("workload error")
3939
break
4040
elif prev_idx == len(previous_indices):
4141
curr_workload = current_values[curr_idx][0]
@@ -47,7 +47,7 @@ def compare(previous_df, current_df, workload_cols, feature_cols):
4747
else:
4848
send_slack_message(f"{prev_workload}, {curr_workload} workload error")
4949
print(f"{prev_workload}, {curr_workload} workload error")
50-
raise Exception('workload error')
50+
raise Exception("workload error")
5151
break
5252

5353
prev_workload = previous_values[prev_idx][0]
@@ -66,53 +66,75 @@ def compare(previous_df, current_df, workload_cols, feature_cols):
6666
else:
6767
send_slack_message(f"{prev_workload}, {curr_workload} workload error")
6868
print(f"{prev_workload}, {curr_workload} workload error")
69-
raise Exception('workload error')
69+
raise Exception("workload error")
7070
else:
7171
if prev_feature != curr_feature:
7272
changed_indices.append(current_indices[curr_idx])
7373
curr_idx += 1
7474
prev_idx += 1
75-
changed_df = current_df.loc[changed_indices].drop(['Workload', 'Feature'], axis=1)
76-
removed_df = previous_df.loc[removed_indices].drop(['Workload', 'Feature'], axis=1)
75+
changed_df = current_df.loc[changed_indices].drop(["Workload", "Feature"], axis=1)
76+
removed_df = previous_df.loc[removed_indices].drop(["Workload", "Feature"], axis=1)
7777

7878
for col in feature_cols:
7979
removed_df[col] = 0
8080

81-
# removed_df have one more column, 'Ceased'
82-
removed_df['Ceased'] = True
81+
# removed_df have one more column, "Ceased"
82+
removed_df["Ceased"] = True
8383

8484
return changed_df, removed_df
8585

8686
# ------ Compare the values of T3 and T2 ------
87-
def compare_max_instance(merge_df, previous_df, target_capacity):
88-
condition = (previous_df['InstanceType'] == merge_df['InstanceType']) & (previous_df['AZ'] == merge_df['AZ'])
89-
current_df = merge_df
87+
def compare_max_instance(previous_df, new_df, target_capacity):
88+
fallback_dict = {50:45, 45:40, 40:35, 35:30, 30:25, 25:20, 20:15, 15:10, 10:5, 5:1, 1:0}
89+
fallback_val = fallback_dict.get(target_capacity, 0)
9090

91-
current_df.loc[condition, 'T3'] = np.maximum(
92-
previous_df.loc[condition, 'T3'], merge_df.loc[condition, 'T3']
91+
spotlake_df = new_df.copy()
92+
93+
merged_df = pd.merge(
94+
spotlake_df,
95+
previous_df[["InstanceType", "AZ", "SPS", "T3", "T2"]],
96+
on=["InstanceType", "AZ"],
97+
how="left",
98+
suffixes=("", "_prev")
99+
)
100+
101+
# Fix SPS when single node SPS
102+
if target_capacity == 1:
103+
merged_df["SPS"] = merged_df["SPS"].combine_first(merged_df["SPS_prev"])
104+
105+
# Merge single node SPS with multi node SPS if (multi node SPS) > (single node SPS)
106+
merged_df.loc[(merged_df["SPS"] > merged_df["SPS_prev"]), "SPS_prev"] = merged_df["SPS"]
107+
108+
# Calculate T3
109+
merged_df["T3"] = np.where(
110+
merged_df["SPS"] >= 3,
111+
np.maximum(merged_df["T3"], merged_df["T3_prev"]),
112+
np.minimum(fallback_val, merged_df["T3_prev"])
93113
)
94-
current_df.loc[condition, 'T2'] = np.maximum(
95-
previous_df.loc[condition, 'T2'], merge_df.loc[condition, 'T2']
114+
115+
# Calculate T2
116+
merged_df["T2"] = np.where(
117+
merged_df["SPS"] >= 2,
118+
np.maximum(merged_df["T2"], merged_df["T2_prev"]),
119+
np.minimum(fallback_val, merged_df["T2_prev"])
96120
)
97121

98-
current_df.loc[condition & (merge_df['T3'] == target_capacity), 'T2'] = target_capacity
99-
100122
if target_capacity == 1:
101-
current_df.loc[condition & (merge_df['T3'] == 0), 'T3'] = 0
102-
current_df.loc[condition & (merge_df['T2'] == 0), 'T2'] = 0
123+
# When SPS lower than condition, set T3 or T2 to 0
124+
merged_df.loc[merged_df["SPS"] <= 2, "T3"] = 0
125+
merged_df.loc[merged_df["SPS"] < 2, "T2"] = 0
103126
else:
104-
# Merging collection and previous data
105-
current_df = pd.merge(
106-
current_df,
107-
previous_df[['InstanceType', 'AZ', 'SPS']],
108-
on=['InstanceType', 'AZ'],
109-
how='left',
110-
suffixes=('', '_new')
111-
)
112-
# Overwrite SPS value of target capacity 1
113-
current_df['SPS_new'] = current_df['SPS_new'].dropna()
114-
current_df['SPS'] = current_df['SPS_new'].combine_first(current_df['SPS'])
115-
# Delete unnecessary column
116-
current_df = current_df.drop(columns=['SPS_new'])
127+
# When SPS lower than condition, set T3 or T2 to 0
128+
merged_df.loc[merged_df["SPS_prev"] <= 2, "T3"] = 0
129+
merged_df.loc[merged_df["SPS_prev"] < 2, "T2"] = 0
130+
# Fix SPS to Single node SPS
131+
merged_df["SPS"] = merged_df["SPS_prev"]
132+
133+
# Convert to int
134+
for col in ["SPS", "T2", "T3"]:
135+
merged_df[col] = merged_df[col].astype("Int64")
136+
137+
# Drop unnecessary columns
138+
merged_df.drop(columns=["T3_prev", "T2_prev", "SPS_prev"], inplace=True)
117139

118-
return current_df
140+
return merged_df

collector/spot-dataset/aws/lambda/post_processing_data/lambda_function.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
from upload_data import upload_timestream, update_latest, save_raw, update_query_selector, update_config
1212
from compare_data import compare, compare_max_instance
1313

14+
class FirstRunError(Exception):
15+
pass
16+
1417
def main():
1518
print("Start Lambda Function")
1619
start_time = datetime.now(timezone.utc)
@@ -100,10 +103,10 @@ def main():
100103
existing_columns = [col for col in columns_to_check if col in previous_df.columns]
101104

102105
if len(existing_columns) == 0:
103-
raise
106+
raise FirstRunError("Can't load the previous df from s3 bucket or First run since changing the collector")
104107
else:
105108
previous_df = previous_df.drop(columns=['Id'])
106-
except:
109+
except FirstRunError as e:
107110
# If system is first time uploading data, make a new one and upload it to TSDB
108111
update_latest(merge_df, TIMESTAMP)
109112
save_raw(merge_df, TIMESTAMP)
@@ -118,7 +121,7 @@ def main():
118121
start_time = datetime.now(timezone.utc)
119122

120123
# ------ Compare T3 and T2 Data ------
121-
current_df = compare_max_instance(merge_df, previous_df, target_capacity)
124+
current_df = compare_max_instance(previous_df, merge_df, target_capacity)
122125

123126
# ------ Upload Merge DF to s3 Bucket ------
124127
update_latest(current_df, TIMESTAMP)
@@ -154,4 +157,8 @@ def lambda_handler(event, context):
154157
main()
155158
end_time = datetime.now(timezone.utc)
156159
print(f"Running time is {(end_time - start_time).total_seconds() * 1000 / 60000:.2f} min")
157-
return "Process completed successfully"
160+
return "Process completed successfully"
161+
162+
if __name__ == "__main__":
163+
lambda_handler({}, {})
164+

collector/spot-dataset/aws/lambda/post_processing_data/upload_data.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
max_pool_connections=5000, retries={'max_attempts': 10}))
1818

1919
# Submit Batch To Timestream
20-
21-
2220
def submit_batch(records, counter, recursive):
2321
if recursive == 10:
2422
return

0 commit comments

Comments
 (0)