-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess.py
98 lines (83 loc) · 3.77 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from threading import Thread
from abc import ABC, abstractmethod
import subprocess
import os
import time
class Process(Thread, ABC):
def __init__(self, name):
super().__init__()
self.name = name
self.start_time = None # time when the process started
self.required_gpus = 0
@abstractmethod
def run(self):
pass
@abstractmethod
def terminate(self):
pass
class CondaProcess(Process):
def __init__(self, name, logger, env_name, script_path, sid, kwargs):
super().__init__(name)
self.logger = logger
self.env_name = env_name
self.script_path = script_path
self.sid = sid
self.kwargs = kwargs
self.process = None
self.tmp_script_path_0 = f"./tmp/tmp_script_{name}.sh"
self.tmp_script_path = self.tmp_script_path_0
self.script_base = self.kwargs.get("base", "./")
self.required_gpus = self.kwargs.get("gpu_num", 0)
# Create a temporary script file and copy the content of the original script file to it
conda_base = subprocess.check_output(["conda", "info", "--base"], text=True).strip()
python_excutable = os.path.join(conda_base, "envs", self.env_name, "bin", "python")
with open(self.tmp_script_path_0, "r") as f:
script_lines = f.readlines()
with open(self.tmp_script_path, "wt") as f:
f.write(f'cd {self.script_base}\n')
for line in script_lines:
line = line.strip()
if "python" in line:
line = line.replace("python", python_excutable)
f.write(line + "\n")
def run(self):
try:
if self.required_gpus > 0:
allocated_gpus = self.kwargs.get("allocated_gpus")
else:
allocated_gpus = []
if allocated_gpus:
if len(allocated_gpus) < self.required_gpus:
self.logger.error(f"Not enough free gpus for process: {self.script_path}, need {self.required_gpus} gpus")
return
gpus = f"CUDA_VISIBLE_DEVICES={','.join(map(str, allocated_gpus))}"
else:
gpus = ""
command = f"{gpus} bash {self.tmp_script_path} {' '.join(self.kwargs.get('args', []))}"
self.logger.info(f"Running script: {self.script_path} submitted at {self.kwargs.get('submit_time')}")
self.logger.info(f"Command: {command}")
if self.kwargs.get("output_path") is not None:
redirect_outf = os.path.join(self.script_base, self.kwargs["output_path"])
# redirect_outf = kwargs["output_path"]
else:
redirect_outf = os.path.join(self.script_base, f"{self.sid}-{os.path.basename(self.script_path)}.out")
# redirect_outf = f"{sid}-{os.path.basename(script_path)}.out"
if os.path.dirname(redirect_outf):
os.makedirs(os.path.dirname(redirect_outf), exist_ok=True)
with open(redirect_outf, "wt") as outf:
self.process = subprocess.Popen(command, shell=True, stdout=outf, stderr=outf, preexec_fn=os.setsid)
self.process.wait()
except Exception as e:
self.logger.error(f"Error while running script {self.script_path}: {e.with_traceback()}")
finally:
self.__del__()
def terminate(self):
if self.process is not None and self.process.poll() is None:
# self.process.kill()
os.killpg(os.getpgid(self.process.pid), 9)
self.logger.info(f"Terminated process: {self.script_path}")
def __del__(self):
if os.path.exists(self.tmp_script_path):
os.remove(self.tmp_script_path)
self.terminate()
# self.join()