Skip to content

Commit f747dbc

Browse files
authored
Merge pull request #620 from ddps-lab/titans
Titans 기능 적용
2 parents d8a7857 + 31299b6 commit f747dbc

82 files changed

Lines changed: 6068 additions & 501 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

collector/run_cleanup_orphans.sh

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#!/bin/bash
2+
# TITANS Orphan Cleanup Script
3+
# orphan = manifest에 없는 S3 잔여 파일 (IAM 권한 누락 등으로 삭제 실패 시 누적)
4+
5+
# --- Test 환경 ---
6+
7+
# Dry run (기본값, 삭제하지 않고 목록만 출력)
8+
TITANS_ENV=test uv run python -m titans_common.cleanup_orphans --year 2026 --month 2 --profile spotrank
9+
10+
# 실제 삭제
11+
# TITANS_ENV=test uv run python -m titans_common.cleanup_orphans --year 2026 --month 2 --profile spotrank --execute
12+
13+
# --- Production 환경 ---
14+
15+
# TITANS_ENV=production uv run python -m titans_common.cleanup_orphans --year 2026 --month 2 --profile spotrank
16+
# TITANS_ENV=production uv run python -m titans_common.cleanup_orphans --year 2026 --month 2 --profile spotrank --execute
17+
18+
# --- 특정 provider ---
19+
20+
# TITANS_ENV=test uv run python -m titans_common.cleanup_orphans --year 2026 --month 2 --provider azure --profile spotrank
21+
# TITANS_ENV=test uv run python -m titans_common.cleanup_orphans --year 2026 --month 2 --provider gcp --profile spotrank

collector/spot-dataset/aws/batch-test/Dockerfile

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,16 @@ RUN pip install --no-cache-dir \
5252
requests \
5353
azure-identity \
5454
azure-core \
55-
scikit-learn
55+
scikit-learn \
56+
polars>=1.37.0 \
57+
pyarrow
5658

5759
# Copy utility module (shared by both AWS and Azure)
5860
COPY utility /app/utility
5961

62+
# Copy titans_common module (TITANS Hot/Warm tier integration)
63+
COPY collector/titans_common /app/collector/titans_common
64+
6065
# Copy AWS test collector code
6166
COPY collector/spot-dataset/aws/batch-test /app/collector/spot-dataset/aws/batch-test
6267
RUN chmod +x /app/collector/spot-dataset/aws/batch-test/scripts/run_collection.sh
@@ -67,6 +72,7 @@ RUN chmod +x /app/collector/spot-dataset/azure/batch-test/scripts/run_collection
6772

6873
# Set PYTHONPATH to include /app so imports work
6974
ENV PYTHONPATH=/app
75+
ENV PANDAS_FUTURE_INFER_STRING=0
7076

7177
# Default entrypoint (can be overridden)
7278
CMD ["python3", "--version"]

collector/spot-dataset/aws/batch-test/infrastructure/main.tf

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ resource "aws_iam_policy" "batch_job_policy" {
102102
policy = jsonencode({
103103
Version = "2012-10-17"
104104
Statement = [
105+
# Primary bucket (read) and test bucket (write)
105106
{
106107
Effect = "Allow"
107108
Action = [
@@ -112,7 +113,23 @@ resource "aws_iam_policy" "batch_job_policy" {
112113
]
113114
Resource = [
114115
"arn:aws:s3:::${var.s3_bucket}",
115-
"arn:aws:s3:::${var.s3_bucket}/*"
116+
"arn:aws:s3:::${var.s3_bucket}/*",
117+
"arn:aws:s3:::${var.s3_bucket}-test",
118+
"arn:aws:s3:::${var.s3_bucket}-test/*"
119+
]
120+
},
121+
# TITANS Hot/Warm tier bucket (parquet storage)
122+
{
123+
Effect = "Allow"
124+
Action = [
125+
"s3:GetObject",
126+
"s3:PutObject",
127+
"s3:DeleteObject",
128+
"s3:ListBucket"
129+
]
130+
Resource = [
131+
"arn:aws:s3:::${var.titans_bucket}",
132+
"arn:aws:s3:::${var.titans_bucket}/*"
116133
]
117134
},
118135
{

collector/spot-dataset/aws/batch-test/infrastructure/variables.tf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ variable "s3_bucket" {
2525
default = "spotlake"
2626
}
2727

28+
variable "titans_bucket" {
29+
description = "S3 Bucket for TITANS Hot/Warm tier parquet data"
30+
type = string
31+
default = "titans-spotlake-data"
32+
}
33+
2834
variable "image_uri" {
2935
description = "Docker Image URI for Batch Jobs"
3036
type = string

collector/spot-dataset/aws/batch-test/merge/compare_data.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ def compare(previous_df, current_df, workload_cols, feature_cols):
3434
prev_idx += 1
3535
continue
3636
else:
37-
send_slack_message(f"{prev_workload}, {curr_workload} workload error")
38-
print(f"{prev_workload}, {curr_workload} workload error")
37+
send_slack_message(f"{prev_workload} workload error (current array exhausted)")
38+
print(f"{prev_workload} workload error (current array exhausted)")
3939
raise Exception("workload error")
4040
elif prev_idx == len(previous_indices):
4141
curr_workload = current_values[curr_idx][0]
@@ -44,8 +44,8 @@ def compare(previous_df, current_df, workload_cols, feature_cols):
4444
curr_idx += 1
4545
continue
4646
else:
47-
send_slack_message(f"{prev_workload}, {curr_workload} workload error")
48-
print(f"{prev_workload}, {curr_workload} workload error")
47+
send_slack_message(f"{curr_workload} workload error (previous array exhausted)")
48+
print(f"{curr_workload} workload error (previous array exhausted)")
4949
raise Exception("workload error")
5050

5151
prev_workload = previous_values[prev_idx][0]
@@ -135,7 +135,7 @@ def compare_max_instance(previous_df, new_df, target_capacity):
135135

136136
# Convert to int
137137
for col in ["SPS", "T2", "T3"]:
138-
merged_df[col] = merged_df[col].astype("Int64")
138+
merged_df[col] = merged_df[col].astype("int64")
139139

140140
# Drop unnecessary columns
141141
merged_df.drop(columns=["T3_prev", "T2_prev", "SPS_prev"], inplace=True)

collector/spot-dataset/aws/batch-test/merge/merge_data.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,22 @@
66
import json
77
import pandas as pd
88
import argparse
9+
import os
10+
import sys
11+
from pathlib import Path
12+
13+
# TITANS environment setup (Test) - set before imports
14+
os.environ.setdefault("TITANS_ENV", "test")
15+
16+
# Add titans_common path (merge -> batch-test -> aws -> spot-dataset -> collector)
17+
COLLECTOR_ROOT = Path(__file__).resolve().parents[4]
18+
sys.path.insert(0, str(COLLECTOR_ROOT))
19+
20+
from titans_common.upload_titans import upload_hot_tier
21+
from titans_common.warm_compactor import run_compaction, ConcurrencyConflictError
22+
from titans_common.utils import prepare_for_upload
23+
24+
PROVIDER = "aws" # Provider constant
925

1026
# ------ import user module ------
1127
from utility.slack_msg_sender import send_slack_message
@@ -235,6 +251,29 @@ def main():
235251
end_time = datetime.now(timezone.utc)
236252
print(f"Uploading time to TSDB is {(end_time - start_time).total_seconds() * 1000 / 60000:.2f} min")
237253

254+
# ------ TITANS Hot tier upload + Warm compaction (Test environment) ------
255+
try:
256+
# Merge changed_df + removed_df (with Ceased column alignment)
257+
# Note: -1 sentinel values are NOT filtered — same behavior as Timestream's
258+
# upload_timestream(data.dropna()). The fillna(-1) values pass through to
259+
# TITANS, matching Timestream TSDB semantics.
260+
combined_df = prepare_for_upload(changed_df, removed_df, pk_columns=workload_cols)
261+
262+
# Ensure timezone-aware (TIMESTAMP must be timezone-aware)
263+
ts_utc = TIMESTAMP if TIMESTAMP.tzinfo else TIMESTAMP.replace(tzinfo=timezone.utc)
264+
265+
if not combined_df.empty:
266+
titans_s3 = boto3.client("s3")
267+
hot_key = upload_hot_tier(combined_df, ts_utc, provider=PROVIDER, s3_client=titans_s3)
268+
if hot_key:
269+
run_compaction(hot_key, ts_utc, provider=PROVIDER, timeout_seconds=30.0, s3_client=titans_s3)
270+
print(f"[TITANS/{PROVIDER}/TEST] Successfully uploaded to test environment")
271+
272+
except ConcurrencyConflictError as e:
273+
print(f"[TITANS/{PROVIDER}/TEST] Concurrency conflict, will retry next cycle: {e}")
274+
except Exception as e:
275+
print(f"[TITANS/{PROVIDER}/TEST] Failed (non-fatal): {e}")
276+
238277
# ------ Upload Spotlake Query Selector to S3 ------
239278
start_time = datetime.now(timezone.utc)
240279
update_query_selector(changed_df)

collector/spot-dataset/aws/batch-test/merge/upload_data.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
# ------ import user module ------
99
from utility.utils import get_region
10+
from utility.slack_msg_sender import send_slack_message
1011

1112
BUCKET_NAME = "spotlake-test"
1213
S3_PATH_PREFIX = "rawdata/aws"

0 commit comments

Comments
 (0)