Skip to content

Commit a01a3ac

Browse files
committed
debug(azure-test): add upload_debug_dataframe function for S3 debug artifact uploads
1 parent 3c7c51f commit a01a3ac

2 files changed

Lines changed: 52 additions & 5 deletions

File tree

collector/spot-dataset/azure/batch-test/merge/merge_data.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,27 @@
3030
PROVIDER = "azure"
3131
os.environ.setdefault("TITANS_ENV", "test")
3232
TITANS_ENABLED = os.environ.get("TITANS_ENABLED", "1") == "1"
33+
DEBUG_ARTIFACTS_ENABLED = True
34+
DEBUG_S3_PREFIX = "rawdata/azure/debug"
35+
36+
37+
def upload_debug_dataframe(s3_client, df, timestamp_utc, stage):
38+
if not DEBUG_ARTIFACTS_ENABLED or df is None:
39+
return
40+
41+
date_path = timestamp_utc.strftime("%Y/%m/%d")
42+
time_str = timestamp_utc.strftime("%H-%M-%S")
43+
s3_key = f"{DEBUG_S3_PREFIX}/{stage}/{date_path}/{time_str}.pkl.gz"
44+
local_path = f"/tmp/{stage}_{time_str}.pkl.gz"
45+
46+
debug_df = df.copy()
47+
debug_df.to_pickle(local_path, compression='gzip')
48+
49+
with open(local_path, 'rb') as f:
50+
s3_client.upload_fileobj(f, STORAGE_CONST.WRITE_BUCKET_NAME, s3_key)
51+
52+
os.remove(local_path)
53+
Logger.info(f"Uploaded debug artifact: {s3_key}")
3354

3455
def merge_if_saving_price_sps_df(price_saving_if_df, sps_df, az=True):
3556
join_df = pd.merge(price_saving_if_df, sps_df, on=['InstanceTier', 'InstanceType', 'Region'], how='outer')
@@ -258,8 +279,11 @@ def main():
258279
else:
259280
price_saving_if_df = pd.DataFrame(columns=['InstanceTier', 'InstanceType', 'Region', 'OndemandPrice', 'SpotPrice', 'Savings', 'IF'])
260281

282+
upload_debug_dataframe(s3_client, price_saving_if_df, timestamp_utc, "merge_input_price_saving_if")
283+
261284
# Merge with SPS
262285
sps_merged_df = merge_if_saving_price_sps_df(price_saving_if_df, sps_df, az=True)
286+
upload_debug_dataframe(s3_client, sps_merged_df, timestamp_utc, "merge_output_sps_merged")
263287

264288
# Process prev_all_data (already loaded in parallel above)
265289
# CRITICAL: Filter prev_all_data to latest timestamp

collector/spot-dataset/azure/batch-test/price/collect_price.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
GET_PRICE_URL = "https://prices.azure.com/api/retail/prices?currencyCode='USD'&$filter=serviceName eq 'Virtual Machines' and priceType eq 'Consumption'&$skip="
2323
FILTER_LOCATIONS = ['GOV', 'DoD', 'China', 'Germany']
2424
MAX_SKIP = 2000
25+
DEBUG_ARTIFACTS_ENABLED = True
26+
DEBUG_S3_PREFIX = "rawdata/azure/debug"
2527

2628
# Globals for threading
2729
price_list = []
@@ -119,6 +121,25 @@ def preprocessing_price(df):
119121

120122
return join_df
121123

124+
125+
def upload_debug_dataframe(s3_client, df, timestamp_utc, stage):
126+
if not DEBUG_ARTIFACTS_ENABLED or df is None:
127+
return
128+
129+
date_path = timestamp_utc.strftime("%Y/%m/%d")
130+
time_str = timestamp_utc.strftime("%H-%M-%S")
131+
s3_key = f"{DEBUG_S3_PREFIX}/{stage}/{date_path}/{time_str}.pkl.gz"
132+
local_path = f"/tmp/{stage}_{time_str}.pkl.gz"
133+
134+
debug_df = df.copy()
135+
debug_df.to_pickle(local_path, compression='gzip')
136+
137+
with open(local_path, 'rb') as f:
138+
s3_client.upload_fileobj(f, STORAGE_CONST.WRITE_BUCKET_NAME, s3_key)
139+
140+
os.remove(local_path)
141+
print(f"Uploaded debug artifact: {s3_key}")
142+
122143
def collect_price_with_multithreading():
123144
global price_list, response_dict, event
124145
price_list = []
@@ -146,13 +167,13 @@ def collect_price_with_multithreading():
146167
send_slack_message(f"[Azure Collector]: {i} respones occurred {response_dict[i]} times.")
147168

148169
if not price_list:
149-
return pd.DataFrame()
170+
return pd.DataFrame(), pd.DataFrame()
150171

151172
price_df = pd.DataFrame(price_list)
152-
savings_df = preprocessing_price(price_df)
153-
savings_df = savings_df.drop_duplicates(subset=['InstanceTier', 'InstanceType', 'Region'], keep='first')
173+
pre_dedup_df = preprocessing_price(price_df)
174+
savings_df = pre_dedup_df.drop_duplicates(subset=['InstanceTier', 'InstanceType', 'Region'], keep='first')
154175

155-
return savings_df
176+
return pre_dedup_df, savings_df
156177

157178
def main():
158179
parser = argparse.ArgumentParser()
@@ -177,7 +198,7 @@ def main():
177198
try:
178199
start_time = datetime.now(timezone.utc)
179200

180-
savings_df = collect_price_with_multithreading()
201+
pre_dedup_df, savings_df = collect_price_with_multithreading()
181202

182203
if savings_df.empty:
183204
print("No price data collected.")
@@ -188,6 +209,8 @@ def main():
188209

189210
# Save to S3
190211
s3_client = boto3.client('s3')
212+
upload_debug_dataframe(s3_client, pre_dedup_df, timestamp_utc, "price_saving_if_pre_dedup")
213+
upload_debug_dataframe(s3_client, savings_df, timestamp_utc, "price_saving_if_post_dedup")
191214
s3_key = f"{S3_PATH_PREFIX}/{date_path}/{time_str}_spot_price.pkl.gz"
192215

193216
local_path = f"/tmp/{time_str}_spot_price.pkl.gz"

0 commit comments

Comments
 (0)