Skip to content

Commit d5f99f0

Browse files
jinsoojinsoo
authored andcommitted
feat: manage runpod lifecycle for labeling task in prefect flow
1 parent b69e6f3 commit d5f99f0

File tree

2 files changed

+205
-40
lines changed

2 files changed

+205
-40
lines changed

scripts/distill_flows.py

Lines changed: 163 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
1818
실행:
1919
python scripts/distill_flows.py build_dataset [--input path] [--out-dir dir]
20+
python scripts/distill_flows.py labeling --train-path datasets/xxx/train.json
21+
python scripts/distill_flows.py labeling_with_pod --train-path datasets/xxx/train.json # Pod 생성→라벨링→삭제
2022
python scripts/distill_flows.py all
2123
"""
2224

@@ -33,8 +35,11 @@
3335

3436
import argparse
3537
import json
38+
import logging
39+
import os
3640
import subprocess
3741
import sys
42+
import time
3843
from datetime import datetime
3944
from pathlib import Path
4045

@@ -48,6 +53,16 @@
4853
_SCRIPT_DIR = Path(__file__).resolve().parent
4954
_PROJECT_ROOT = _SCRIPT_DIR.parent
5055

56+
# RunPodClient import (scripts/runpod_cli)
57+
if str(_SCRIPT_DIR) not in sys.path:
58+
sys.path.insert(0, str(_SCRIPT_DIR))
59+
try:
60+
from runpod_cli.pod_create_delete_cli import RunPodClient
61+
except ImportError:
62+
RunPodClient = None
63+
64+
logger = logging.getLogger(__name__)
65+
5166

5267
@task(name="build-dataset-task", log_prints=True)
5368
def build_dataset_task(
@@ -185,6 +200,123 @@ def labeling_flow(
185200
)
186201

187202

203+
def _wait_for_vllm_ready(base_url: str, timeout_sec: int = 180, poll_interval: int = 10) -> None:
204+
"""vLLM /v1/models 가 응답할 때까지 대기."""
205+
import requests
206+
url = base_url.rstrip("/") + "/v1/models"
207+
deadline = time.time() + timeout_sec
208+
while time.time() < deadline:
209+
try:
210+
r = requests.get(url, timeout=10)
211+
if r.status_code == 200:
212+
logger.info("vLLM ready: %s", url)
213+
return
214+
except Exception as e:
215+
logger.debug("vLLM not ready yet: %s", e)
216+
time.sleep(poll_interval)
217+
raise TimeoutError(f"vLLM at {base_url} did not become ready within {timeout_sec}s")
218+
219+
220+
@task(name="labeling-with-pod-task", log_prints=True)
221+
def labeling_with_pod_task(
222+
train_path: str,
223+
val_path: str | None = None,
224+
test_path: str | None = None,
225+
openai_cap: int = 500,
226+
output_labeled_dir: str | None = None,
227+
pod_wait_timeout_sec: int = 600,
228+
vllm_ready_timeout_sec: int = 180,
229+
) -> dict:
230+
"""
231+
Pod 생성 → vLLM 준비 대기 → 라벨링 → Pod 삭제.
232+
RUNPOD_API_KEY 필요. self-hosted teacher용 vLLM Pod를 생성 후 label_for_distill 실행.
233+
"""
234+
if RunPodClient is None:
235+
raise RuntimeError("RunPodClient not available. Check runpod_cli import.")
236+
token = os.environ.get("RUNPOD_API_KEY")
237+
if not token:
238+
raise ValueError("RUNPOD_API_KEY environment variable is required for labeling_with_pod")
239+
240+
client = RunPodClient(token=token)
241+
payload = RunPodClient.get_default_pod_payload()
242+
pod = client.create_pod(payload)
243+
pod_id = pod["id"]
244+
print("Pod created:", pod_id)
245+
246+
try:
247+
ready = client.wait_until_running(pod_id, timeout_sec=pod_wait_timeout_sec)
248+
public_ip = ready.get("publicIp")
249+
if not public_ip:
250+
raise RuntimeError(f"Pod {pod_id} has no publicIp. Response: {ready}")
251+
252+
base_url = f"http://{public_ip}:8000/v1"
253+
print("Pod ready:", pod_id, "base_url:", base_url)
254+
255+
_wait_for_vllm_ready(base_url, timeout_sec=vllm_ready_timeout_sec)
256+
257+
env = os.environ.copy()
258+
env["VLLM_POD_BASE_URL"] = base_url
259+
env["USE_POD_VLLM"] = "true"
260+
env["LLM_PROVIDER"] = "runpod"
261+
262+
out_dir = Path(output_labeled_dir or _PROJECT_ROOT / "distill_pipeline_output")
263+
version = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
264+
labeled_dir = out_dir / "labeled" / version
265+
labeled_dir.mkdir(parents=True, exist_ok=True)
266+
267+
cmd = [
268+
sys.executable,
269+
str(_SCRIPT_DIR / "label_for_distill.py"),
270+
"--train-path", str(train_path),
271+
"--openai-cap", str(openai_cap),
272+
"--output-dir", str(labeled_dir),
273+
]
274+
if val_path and Path(val_path).exists():
275+
cmd.extend(["--val-path", str(val_path)])
276+
if test_path and Path(test_path).exists():
277+
cmd.extend(["--test-path", str(test_path)])
278+
279+
result = subprocess.run(cmd, cwd=str(_PROJECT_ROOT), env=env, capture_output=False)
280+
if result.returncode != 0:
281+
raise RuntimeError(f"label_for_distill.py exited with {result.returncode}")
282+
283+
labeled_path = labeled_dir / "train_labeled.json"
284+
out = {"labeled_version": version, "labeled_path": str(labeled_path)}
285+
for name, fn in [("val_labeled_path", "val_labeled.json"), ("test_labeled_path", "test_labeled.json")]:
286+
p = labeled_dir / fn
287+
if p.exists():
288+
out[name] = str(p)
289+
return out
290+
finally:
291+
print("Cleaning up pod:", pod_id)
292+
client.delete_pod(pod_id)
293+
294+
295+
@flow(name="labeling_with_pod_flow", log_prints=True)
296+
def labeling_with_pod_flow(
297+
train_path: str,
298+
val_path: str | None = None,
299+
test_path: str | None = None,
300+
openai_cap: int = 500,
301+
output_labeled_dir: str | Path | None = None,
302+
pod_wait_timeout_sec: int = 600,
303+
vllm_ready_timeout_sec: int = 180,
304+
) -> dict:
305+
"""
306+
Pod 생성 → 라벨링(OpenAI 골드 + vLLM teacher) → Pod 삭제.
307+
docs/runpod_cli/cli_strategy.md: "Pod 생성 → 대기 → VLLM_POD_BASE_URL 설정 후 label_for_distill 실행 → 작업 완료 후 Pod 삭제"
308+
"""
309+
return labeling_with_pod_task(
310+
train_path=train_path,
311+
val_path=val_path,
312+
test_path=test_path,
313+
openai_cap=openai_cap,
314+
output_labeled_dir=str(output_labeled_dir) if output_labeled_dir else None,
315+
pod_wait_timeout_sec=pod_wait_timeout_sec,
316+
vllm_ready_timeout_sec=vllm_ready_timeout_sec,
317+
)
318+
319+
188320
@task(name="train-student-task", log_prints=True)
189321
def train_student_task(
190322
labeled_path: str,
@@ -354,8 +486,8 @@ def main() -> None:
354486
parser = argparse.ArgumentParser(description="Prefect flows for summary KD pipeline (distill_by_prefect.md)")
355487
parser.add_argument(
356488
"flow",
357-
choices=["build_dataset", "labeling", "train_student", "evaluate", "all"],
358-
help="Flow to run",
489+
choices=["build_dataset", "labeling", "labeling_with_pod", "train_student", "evaluate", "all"],
490+
help="Flow to run (labeling_with_pod: Pod 생성→라벨링→삭제)",
359491
)
360492
parser.add_argument("--input", type=Path, default=None, help="Input reviews JSON (default: tasteam_app_all_review_data.json)")
361493
parser.add_argument("--out-dir", type=Path, default=None, help="Output root (default: distill_pipeline_output)")
@@ -404,6 +536,35 @@ def main() -> None:
404536
output_labeled_dir=out_dir,
405537
)
406538
print("Result:", result)
539+
elif args.flow == "labeling_with_pod":
540+
if not args.train_path:
541+
parser.error("labeling_with_pod requires --train-path")
542+
ds_dir = out_dir / "datasets"
543+
val_p, test_p = None, None
544+
if args.val_path:
545+
val_p = str(args.val_path)
546+
elif ds_dir.exists():
547+
for d in sorted(ds_dir.iterdir(), reverse=True):
548+
v = d / "val.json"
549+
if v.exists():
550+
val_p = str(v)
551+
break
552+
if args.test_path:
553+
test_p = str(args.test_path)
554+
elif ds_dir.exists():
555+
for d in sorted(ds_dir.iterdir(), reverse=True):
556+
t = d / "test.json"
557+
if t.exists():
558+
test_p = str(t)
559+
break
560+
result = labeling_with_pod_flow(
561+
train_path=str(args.train_path),
562+
val_path=val_p,
563+
test_path=test_p,
564+
openai_cap=args.openai_cap,
565+
output_labeled_dir=out_dir,
566+
)
567+
print("Result:", result)
407568
elif args.flow == "train_student":
408569
if not args.labeled_path:
409570
parser.error("train_student requires --labeled-path")

scripts/runpod_cli/pod_create_delete_cli.py

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,46 @@ def wait_until_running(
6666

6767
raise TimeoutError(f"Pod {pod_id} did not reach RUNNING within {timeout_sec}s. Last: {last}")
6868

69+
@staticmethod
70+
def get_default_pod_payload() -> Dict[str, Any]:
71+
"""vLLM Pod 생성용 기본 payload (distill 라벨링 등에서 재사용)."""
72+
return {
73+
"allowedCudaVersions": ["13.0"],
74+
"cloudType": "SECURE",
75+
"computeType": "GPU",
76+
"containerDiskInGb": 50,
77+
"cpuFlavorPriority": "availability",
78+
"dataCenterIds": [
79+
"EU-RO-1", "CA-MTL-1", "EU-SE-1", "US-IL-1", "EUR-IS-1", "EU-CZ-1", "US-TX-3", "EUR-IS-2",
80+
"US-KS-2", "US-GA-2", "US-WA-1", "US-TX-1", "CA-MTL-3", "EU-NL-1", "US-TX-4", "US-CA-2",
81+
"US-NC-1", "OC-AU-1", "US-DE-1", "EUR-IS-3", "CA-MTL-2", "AP-JP-1", "EUR-NO-1", "EU-FR-1",
82+
"US-KS-3", "US-GA-1",
83+
],
84+
"dataCenterPriority": "availability",
85+
"dockerEntrypoint": [],
86+
"dockerStartCmd": [],
87+
"env": {"ENV_VAR": "value"},
88+
"globalNetworking": False,
89+
"gpuCount": 1,
90+
"gpuTypeIds": ["NVIDIA GeForce RTX 4090"],
91+
"gpuTypePriority": "availability",
92+
"imageName": "jinsoo1218/vllm-pod:latest",
93+
"interruptible": False,
94+
"locked": False,
95+
"minDiskBandwidthMBps": 123,
96+
"minDownloadMbps": 123,
97+
"minRAMPerGPU": 8,
98+
"minUploadMbps": 123,
99+
"minVCPUPerGPU": 2,
100+
"name": "vllm-pod",
101+
"networkVolumeId": "2kn4qj6rql",
102+
"ports": ["8000/http", "22/tcp"],
103+
"supportPublicIp": True,
104+
"vcpuCount": 2,
105+
"volumeInGb": 20,
106+
"volumeMountPath": "/workspace",
107+
}
108+
69109
def _handle_json_response(self, resp: requests.Response) -> Dict[str, Any]:
70110
# 에러 메시지를 보기 좋게
71111
try:
@@ -82,45 +122,9 @@ def _handle_json_response(self, resp: requests.Response) -> Dict[str, Any]:
82122

83123

84124
if __name__ == "__main__":
85-
token = os.environ["RUNPOD_API_KEY"] # 너가 쓰는 키 이름으로 통일
125+
token = os.environ["RUNPOD_API_KEY"]
86126
client = RunPodClient(token)
87-
88-
payload = {
89-
"allowedCudaVersions": ["13.0"],
90-
"cloudType": "SECURE",
91-
"computeType": "GPU",
92-
"containerDiskInGb": 50,
93-
"cpuFlavorPriority": "availability",
94-
"dataCenterIds": [
95-
"EU-RO-1","CA-MTL-1","EU-SE-1","US-IL-1","EUR-IS-1","EU-CZ-1","US-TX-3","EUR-IS-2",
96-
"US-KS-2","US-GA-2","US-WA-1","US-TX-1","CA-MTL-3","EU-NL-1","US-TX-4","US-CA-2",
97-
"US-NC-1","OC-AU-1","US-DE-1","EUR-IS-3","CA-MTL-2","AP-JP-1","EUR-NO-1","EU-FR-1",
98-
"US-KS-3","US-GA-1"
99-
],
100-
"dataCenterPriority": "availability",
101-
"dockerEntrypoint": [],
102-
"dockerStartCmd": [],
103-
"env": {"ENV_VAR": "value"},
104-
"globalNetworking": False,
105-
"gpuCount": 1,
106-
"gpuTypeIds": ["NVIDIA GeForce RTX 4090"],
107-
"gpuTypePriority": "availability",
108-
"imageName": "jinsoo1218/vllm-pod:latest",
109-
"interruptible": False,
110-
"locked": False,
111-
"minDiskBandwidthMBps": 123,
112-
"minDownloadMbps": 123,
113-
"minRAMPerGPU": 8,
114-
"minUploadMbps": 123,
115-
"minVCPUPerGPU": 2,
116-
"name": "vllm-pod",
117-
"networkVolumeId": "2kn4qj6rql",
118-
"ports": ["8000/http", "22/tcp"],
119-
"supportPublicIp": True,
120-
"vcpuCount": 2,
121-
"volumeInGb": 20,
122-
"volumeMountPath": "/workspace",
123-
}
127+
payload = RunPodClient.get_default_pod_payload()
124128

125129
pod = client.create_pod(payload)
126130
pod_id = pod["id"]

0 commit comments

Comments
 (0)