-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathdebugging.py
More file actions
139 lines (114 loc) · 3.7 KB
/
debugging.py
File metadata and controls
139 lines (114 loc) · 3.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import subprocess
import os
import uuid
import flyte
from dataclasses import dataclass
from flyte.remote._common import ToJSONMixin
@dataclass
class ProcessResult(ToJSONMixin):
stdout: str
stderr: str
returncode: int
# -----------------
# Tasks Environment
# -----------------
task_env = flyte.TaskEnvironment(
name="union_mcp_script_runner",
resources=flyte.Resources(cpu=3, memory="8Gi", disk="100Gi"),
image=(
flyte.Image
.from_debian_base(name="union-mcp-script-runner")
.with_apt_packages("ca-certificates", "git")
.with_pip_packages(
"uv",
"unionai-reuse",
# "flyteidl2==2.0.1",
# "git+https://github.com/flyteorg/flyte-sdk.git@586131aee116ab36caa024d946ed4eea21830b13",
"git+https://github.com/unionai-oss/union-mcp.git@0520716#egg=union-mcp[v2]",
)
),
# reusable=flyte.ReusePolicy(
# replicas=(1, 2),
# idle_ttl=60,
# ),
)
@task_env.task
async def build_script_image_task(script: str, tail: int = 50) -> ProcessResult:
"""Build the container image for a Flyte script.
This task writes the script to a temporary file and runs the build process
using uv to build the container image for the script.
Args:
script: The Python script content to build.
Returns:
A dict containing stdout, stderr, and returncode from the build process.
"""
filename = f"__build_script_{str(uuid.uuid4())[:16]}__.py"
with open(filename, "w") as f:
f.write(script)
try:
proc = subprocess.run(
[
"/opt/venv/bin/python",
filename,
"--build",
],
capture_output=True,
env=os.environ,
text=True,
)
out = ProcessResult(
stdout="\n".join(proc.stdout.splitlines()[-tail:]),
stderr="\n".join(proc.stderr.splitlines()[-tail:]),
returncode=proc.returncode,
)
return out
finally:
os.remove(filename)
@task_env.task
async def run_script_remote_task(script: str, tail: int = 50) -> ProcessResult:
"""Run a Flyte script remotely on the cluster.
This task writes the script to a temporary file and executes it using uv,
which triggers the script's remote execution logic.
Args:
script: The Python script content to run.
Returns:
A dict containing stdout, stderr, and returncode from the run process.
"""
filename = f"__run_script_{str(uuid.uuid4())[:16]}__.py"
with open(filename, "w") as f:
f.write(script)
try:
proc = subprocess.run(
[
"/opt/venv/bin/python",
filename,
],
capture_output=True,
env=os.environ,
text=True,
)
out = ProcessResult(
stdout="\n".join(proc.stdout.splitlines()[-tail:]),
stderr="\n".join(proc.stderr.splitlines()[-tail:]),
returncode=proc.returncode,
)
return out
finally:
os.remove(filename)
if __name__ == "__main__":
with open("examples/v2/sum_of_squares.py", "r") as f:
script = f.read()
flyte.init_from_config()
run = flyte.with_runcontext(
mode="remote",
env_vars={
"FLYTE_API_KEY": os.environ["FLYTE_API_KEY"],
"FLYTE_ORG": os.environ["FLYTE_ORG"],
"FLYTE_PROJECT": os.environ["FLYTE_PROJECT"],
"FLYTE_DOMAIN": os.environ["FLYTE_DOMAIN"],
"FLYTE_ENDPOINT": os.environ["FLYTE_ENDPOINT"],
}
).run(run_script_remote_task, script=script)
print(run.url)
run.wait()
print(run.outputs()[0])