Skip to content

Commit 31299b6

Browse files
committed
feat(azure): implement compaction request handling and logging in merge process
1 parent bba8753 commit 31299b6

2 files changed

Lines changed: 59 additions & 8 deletions

File tree

collector/spot-dataset/azure/batch/merge/merge_data.py

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os
33
import argparse
44
import boto3
5+
import json
56
import pickle
67
import gzip
78
import gc
@@ -52,6 +53,29 @@ def _stage_log(stage: str, *, extra: str = "") -> None:
5253
suffix = f" {extra}" if extra else ""
5354
Logger.info(f"[TITANS/{PROVIDER}] {stage} rss_mb={_rss_mb():.1f}{suffix}")
5455

56+
57+
def _queue_compaction_request(
58+
request_path: str,
59+
*,
60+
hot_key: str,
61+
timestamp: datetime,
62+
timeout_seconds: float,
63+
) -> None:
64+
"""Persist a compaction request for the shell to run in a fresh process."""
65+
request_file = Path(request_path)
66+
request_file.parent.mkdir(parents=True, exist_ok=True)
67+
request_file.write_text(
68+
json.dumps(
69+
{
70+
"provider": PROVIDER,
71+
"hot_key": hot_key,
72+
"timestamp": timestamp.isoformat(),
73+
"timeout_seconds": timeout_seconds,
74+
}
75+
),
76+
encoding="utf-8",
77+
)
78+
5579
def merge_if_saving_price_sps_df(price_saving_if_df, sps_df, az=True):
5680
# Ensure join keys are present and types match
5781
join_df = pd.merge(price_saving_if_df, sps_df, on=['InstanceTier', 'InstanceType', 'Region'], how='outer')
@@ -469,13 +493,30 @@ def main():
469493
extra=f"elapsed_s={time.time() - hot_started:.2f} hot_key={hot_key}",
470494
)
471495
if hot_key:
472-
compact_started = time.time()
473-
_stage_log("run_compaction start", extra=f"hot_key={hot_key}")
474-
run_compaction(hot_key, ts_utc, provider=PROVIDER, timeout_seconds=30.0, s3_client=titans_s3)
475-
_stage_log(
476-
"run_compaction end",
477-
extra=f"elapsed_s={time.time() - compact_started:.2f} hot_key={hot_key}",
478-
)
496+
request_path = os.environ.get("TITANS_COMPACTION_REQUEST_PATH", "").strip()
497+
if request_path:
498+
_stage_log(
499+
"run_compaction handoff start",
500+
extra=f"hot_key={hot_key} request_path={request_path}",
501+
)
502+
_queue_compaction_request(
503+
request_path,
504+
hot_key=hot_key,
505+
timestamp=ts_utc,
506+
timeout_seconds=30.0,
507+
)
508+
_stage_log(
509+
"run_compaction handoff end",
510+
extra=f"hot_key={hot_key} request_path={request_path}",
511+
)
512+
else:
513+
compact_started = time.time()
514+
_stage_log("run_compaction start", extra=f"hot_key={hot_key}")
515+
run_compaction(hot_key, ts_utc, provider=PROVIDER, timeout_seconds=30.0, s3_client=titans_s3)
516+
_stage_log(
517+
"run_compaction end",
518+
extra=f"elapsed_s={time.time() - compact_started:.2f} hot_key={hot_key}",
519+
)
479520
_stage_log("success")
480521
Logger.info(f"[TITANS/{PROVIDER}] Successfully uploaded")
481522
else:

collector/spot-dataset/azure/batch/scripts/run_collection.sh

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
set -e
33

44
TIMESTAMP=$1
5+
SAFE_TIMESTAMP=$(printf '%s' "$TIMESTAMP" | tr -c 'A-Za-z0-9_.-' '_')
6+
COMPACTION_REQUEST_FILE="/tmp/titans_compaction_${SAFE_TIMESTAMP}.json"
57

68
# Capture start time
79
START_TIME_READABLE=$(date "+%Y-%m-%d %H:%M:%S")
@@ -86,7 +88,15 @@ if [ $STATUS_SPS -eq 0 ] && [ $STATUS_IF -eq 0 ] && [ $STATUS_PRICE -eq 0 ]; the
8688
echo "Found SPS Key: $SPS_KEY"
8789

8890
echo "Starting Merge Job..."
89-
python3 collector/spot-dataset/azure/batch/merge/merge_data.py --sps_key "$SPS_KEY"
91+
rm -f "$COMPACTION_REQUEST_FILE"
92+
TITANS_COMPACTION_REQUEST_PATH="$COMPACTION_REQUEST_FILE" \
93+
python3 collector/spot-dataset/azure/batch/merge/merge_data.py --sps_key "$SPS_KEY"
94+
95+
if [ -f "$COMPACTION_REQUEST_FILE" ]; then
96+
echo "Starting TITANS Warm Compaction Job..."
97+
python3 collector/titans_common/run_compaction_request.py --request "$COMPACTION_REQUEST_FILE"
98+
rm -f "$COMPACTION_REQUEST_FILE"
99+
fi
90100

91101
MERGE_END_TIME_EPOCH=$(date +%s)
92102
MERGE_DURATION=$((MERGE_END_TIME_EPOCH - COLLECTION_END_TIME_EPOCH))

0 commit comments

Comments
 (0)