Skip to content

Commit 5f906d2

Browse files
authored
Merge pull request #139 from shazj99/dp_on_yarn
DP on yarn
2 parents f34d7fb + 65e3351 commit 5f906d2

File tree

19 files changed

+1029
-1
lines changed

19 files changed

+1029
-1
lines changed

doc/dpdispatcher_on_yarn.md

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
# Support DPDispatcher on Yarn
2+
## Background
3+
Currently, DPGen(or other DP softwares) supports for HPC systems like Slurm, PBS, LSF and cloud machines. In order to run DPGen jobs on ByteDance internal platform, we need to extend it to support yarn resources. Hadoop Ecosystem is a very commonly used platform to process the big data, and in the process of developing the new interface, we found it can be implemented by only using hadoop opensource components. So for the convenience of the masses, we decided to contribute the codes to opensource community.
4+
5+
## Design
6+
We use DistributedShell and HDFS to implement it. The control flow shows as follows:
7+
![image](https://github.com/shazj99/dpdispatcher/blob/yarn/doc/dpgen_yarn.jpg?raw=true)
8+
- Use DistributedShell to submit yarn jobs. It contains generating shell script and submitting it to yarn queues.
9+
- Use HDFS to save input files and output results. For performance reasons, we choose to pack forward files to a tar.gz file and upload it to HDFS directory. Accordingly, the task will download the tar file before running and upload result tar file to HDFS after it has done.
10+
11+
## Implement
12+
We only need to add two Class which are HDFSContext and DistributedShell:
13+
14+
```
15+
class HDFSContext(BaseContext) :
16+
def upload(self, job_dirs, local_up_files):
17+
""" upload forward files and forward command files to HDFS root dir
18+
19+
Parameters
20+
----------
21+
job_dirs : list
22+
the dictionary which contains the upload files
23+
local_up_files: list
24+
the file names which will be uploaded
25+
26+
Returns
27+
-------
28+
none
29+
"""
30+
pass
31+
32+
def download(self, submission):
33+
""" download backward files from HDFS root dir
34+
35+
Parameters
36+
----------
37+
submission : Submission class instance
38+
represents a collection of tasks, such as backward file names
39+
40+
Returns
41+
-------
42+
none
43+
"""
44+
pass
45+
46+
def check_file_exists(self, fname):
47+
""" check whether the given file exists, often used in checking whether the belonging job has finished
48+
49+
Parameters
50+
----------
51+
fname : string
52+
file name to be checked
53+
54+
Returns
55+
-------
56+
status: boolean
57+
"""
58+
pass
59+
```
60+
61+
```
62+
class DistributedShell(Machine):
63+
def do_submit(self, job):
64+
""" submit th job to yarn using distributed shell
65+
66+
Parameters
67+
----------
68+
job : Job class instance
69+
job to be submitted
70+
71+
Returns
72+
-------
73+
job_id: string
74+
usually a yarn application id
75+
"""
76+
pass
77+
78+
def check_status(self, job):
79+
""" check the yarn job status
80+
81+
Parameters
82+
----------
83+
job : Job class instance
84+
the submitted job
85+
86+
Returns
87+
-------
88+
status: JobStatus
89+
"""
90+
pass
91+
92+
def gen_script_command(self, job):
93+
""" Generate the shell script to be executed in DistibutedShell container
94+
95+
Parameters
96+
----------
97+
job : Job class instance
98+
the submitted job
99+
100+
Returns
101+
-------
102+
script: string
103+
script command string
104+
"""
105+
pass
106+
```
107+
108+
The following is an example of generated shell script. It will be executed in a yarn container:
109+
```
110+
#!/bin/bash
111+
112+
## set envionment variables
113+
source /opt/intel/oneapi/setvars.sh
114+
115+
## download the tar file from hdfs which contains forward files
116+
if ! ls uuid_upload_*.tgz 1>/dev/null 2>&1; then
117+
hadoop fs -get /root/uuid/uuid_upload_*.tgz .
118+
fi
119+
for tgz_file in `ls *.tgz`; do tar xvf $tgz_file; done
120+
121+
## check whether the task has finished successfully
122+
hadoop fs -test -e /root/uuid/sys-0001-0015/tag_0_finished
123+
{ if [ ! $? -eq 0 ] ;then
124+
cur_dir=`pwd`
125+
cd t sys-0001-0015
126+
test $? -ne 0 && exit 1
127+
128+
## do your job here
129+
mpirun -n 32 vasp_std 1>> log 2>> err
130+
131+
if test $? -ne 0; then
132+
exit 1
133+
else
134+
hadoop fs -touchz /root/uuid/sys-0001-0015/tag_0_finished
135+
fi
136+
cd $cur_dir
137+
test $? -ne 0 && exit 1
138+
fi }&
139+
140+
wait
141+
142+
## upload result files to hdfs
143+
tar czf uuid_download.tar.gz sys-0001-0015
144+
hadoop fs -put -f uuid_download.tar.gz /root/uuid/sys-0001-0015
145+
146+
## mark the job has finished
147+
hadoop fs -touchz /root/uuid/uuid_tag_finished
148+
```
149+
An example of machine.json is as follows, whose batch_type is `DistributedShell`,and context_type is `HDFSContext`:
150+
151+
```
152+
"fp": [
153+
{
154+
"command": "mpirun -n 32 vasp_std",
155+
"machine": {
156+
"batch_type": "DistributedShell",
157+
"context_type": "HDFSContext",
158+
"local_root": "./",
159+
"remote_root": "hdfs://path/to/remote/root"
160+
},
161+
"resources": {
162+
"number_node": 1,
163+
"cpu_per_node": 32,
164+
"gpu_per_node": 0,
165+
"queue_name": "queue_name",
166+
"group_size": 1,
167+
"source_list": ["/opt/intel/oneapi/setvars.sh"],
168+
"kwargs": {
169+
"img_name": "",
170+
"mem_limit": 32,
171+
"yarn_path": "/path/to/yarn/jars"
172+
},
173+
"envs" : {
174+
"HADOOP_HOME" : "${HADOOP_HOME:/path/to/hadoop/bin}",
175+
"CLASSPATH": "`${HADOOP_HOME}/bin/hadoop classpath --glob`",
176+
"PATH": "${HADOOP_HOME}/bin:${PATH}"}
177+
}
178+
}
179+
}
180+
]
181+
```

dpdispatcher/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,14 @@
4747
from .shell import Shell
4848
from .lsf import LSF
4949
from .dp_cloud_server import DpCloudServer
50+
from .distributed_shell import DistributedShell
5051
from .machine import Machine
5152

5253
from .lazy_local_context import LazyLocalContext
5354
from .local_context import LocalContext
5455
from .ssh_context import SSHContext
5556
from .dp_cloud_server_context import DpCloudServerContext
57+
from .hdfs_context import HDFSContext
5658

5759
def info():
5860
"""

dpdispatcher/distributed_shell.py

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
from dpdispatcher.JobStatus import JobStatus
2+
from dpdispatcher import dlog
3+
from dpdispatcher.machine import Machine
4+
from dpdispatcher.utils import run_cmd_with_all_output
5+
import subprocess as sp
6+
7+
8+
shell_script_header_template="""
9+
#!/bin/bash -l
10+
set -x
11+
"""
12+
13+
script_env_template="""
14+
{module_unload_part}
15+
{module_load_part}
16+
{source_files_part}
17+
{export_envs_part}
18+
19+
REMOTE_ROOT=`pwd`
20+
echo 0 > {flag_if_job_task_fail}
21+
test $? -ne 0 && exit 1
22+
23+
if ! ls {submission_hash}_upload.tgz 1>/dev/null 2>&1; then
24+
hadoop fs -get {remote_root}/*.tgz .
25+
fi
26+
for TGZ in `ls *.tgz`; do tar xvf $TGZ; done
27+
28+
"""
29+
script_end_template="""
30+
cd $REMOTE_ROOT
31+
test $? -ne 0 && exit 1
32+
33+
wait
34+
FLAG_IF_JOB_TASK_FAIL=$(cat {flag_if_job_task_fail})
35+
if test $FLAG_IF_JOB_TASK_FAIL -eq 0; then
36+
tar czf {submission_hash}_{job_hash}_download.tar.gz {all_task_dirs}
37+
hadoop fs -put -f {submission_hash}_{job_hash}_download.tar.gz {remote_root}
38+
hadoop fs -touchz {remote_root}/{job_tag_finished}
39+
else
40+
exit 1
41+
fi
42+
"""
43+
44+
class DistributedShell(Machine):
45+
def gen_script_env(self, job):
46+
source_files_part = ""
47+
48+
module_unload_part = ""
49+
module_unload_list = job.resources.module_unload_list
50+
for ii in module_unload_list:
51+
module_unload_part += f"module unload {ii}\n"
52+
53+
module_load_part = ""
54+
module_list = job.resources.module_list
55+
for ii in module_list:
56+
module_load_part += f"module load {ii}\n"
57+
58+
source_list = job.resources.source_list
59+
for ii in source_list:
60+
line = "{ source %s; } \n" % ii
61+
source_files_part += line
62+
63+
export_envs_part = ""
64+
envs = job.resources.envs
65+
for k, v in envs.items():
66+
export_envs_part += f"export {k}={v}\n"
67+
68+
flag_if_job_task_fail = job.job_hash + '_flag_if_job_task_fail'
69+
70+
script_env = script_env_template.format(
71+
flag_if_job_task_fail=flag_if_job_task_fail,
72+
module_unload_part=module_unload_part,
73+
module_load_part=module_load_part,
74+
source_files_part=source_files_part,
75+
export_envs_part=export_envs_part,
76+
remote_root=self.context.remote_root,
77+
submission_hash=self.context.submission.submission_hash,
78+
)
79+
return script_env
80+
81+
def gen_script_end(self, job):
82+
all_task_dirs = ""
83+
for task in job.job_task_list:
84+
all_task_dirs += "%s " % task.task_work_path
85+
job_tag_finished = job.job_hash + '_job_tag_finished'
86+
flag_if_job_task_fail = job.job_hash + '_flag_if_job_task_fail'
87+
script_end = script_end_template.format(
88+
job_tag_finished=job_tag_finished,
89+
flag_if_job_task_fail=flag_if_job_task_fail,
90+
all_task_dirs=all_task_dirs,
91+
remote_root=self.context.remote_root,
92+
submission_hash=self.context.submission.submission_hash,
93+
job_hash=job.job_hash
94+
)
95+
return script_end
96+
97+
def gen_script_header(self, job):
98+
shell_script_header = shell_script_header_template
99+
return shell_script_header
100+
101+
def do_submit(self, job):
102+
""" submit th job to yarn using distributed shell
103+
104+
Parameters
105+
----------
106+
job : Job class instance
107+
job to be submitted
108+
109+
Returns
110+
-------
111+
job_id: string
112+
submit process id
113+
"""
114+
115+
script_str = self.gen_script(job)
116+
script_file_name = job.script_file_name
117+
job_id_name = job.job_hash + '_job_id'
118+
output_name = job.job_hash + '.out'
119+
self.context.write_file(fname=script_file_name, write_str=script_str)
120+
121+
resources = job.resources
122+
submit_command = 'hadoop jar %s/hadoop-yarn-applications-distributedshell-*.jar ' \
123+
'org.apache.hadoop.yarn.applications.distributedshell.Client ' \
124+
'-jar %s/hadoop-yarn-applications-distributedshell-*.jar ' \
125+
'-queue %s -appname "distributedshell_dpgen_%s" ' \
126+
'-shell_env YARN_CONTAINER_RUNTIME_TYPE=docker ' \
127+
'-shell_env YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=%s ' \
128+
'-shell_env ENV_DOCKER_CONTAINER_SHM_SIZE=\'600m\' '\
129+
'-master_memory 1024 -master_vcores 2 -num_containers 1 ' \
130+
'-container_resources memory-mb=%s,vcores=%s ' \
131+
'-shell_script /tmp/%s' % (resource.kwargs.get('yarn_path',''),
132+
resource.kwargs.get('yarn_path',''), resources.queue_name, job.job_hash,
133+
resources.kwargs.get('img_name',''),resources.kwargs.get('mem_limit', 1)*1024,
134+
resources.cpu_per_node, script_file_name)
135+
136+
cmd = '{ nohup %s 1>%s 2>%s & } && echo $!' % (submit_command, output_name, output_name)
137+
ret, stdout, stderr = run_cmd_with_all_output(cmd)
138+
139+
if ret != 0:
140+
err_str = stderr.decode('utf-8')
141+
raise RuntimeError\
142+
("Command squeue fails to execute, error message:%s\nreturn code %d\n" % (err_str, ret))
143+
job_id = int(stdout.decode('utf-8').strip())
144+
145+
self.context.write_file(job_id_name, str(job_id))
146+
return job_id
147+
148+
def check_status(self, job):
149+
job_id = job.job_id
150+
if job_id == '' :
151+
return JobStatus.unsubmitted
152+
153+
ret, stdout, stderr = run_cmd_with_all_output(f"if ps -p {job_id} > /dev/null; then echo 1; fi")
154+
if ret != 0:
155+
err_str = stderr.decode('utf-8')
156+
raise RuntimeError \
157+
("Command fails to execute, error message:%s\nreturn code %d\n" % (err_str, ret))
158+
159+
if_job_exists = bool(stdout.decode('utf-8').strip())
160+
if self.check_finish_tag(job=job):
161+
dlog.info(f"job: {job.job_hash} {job.job_id} finished")
162+
return JobStatus.finished
163+
164+
if if_job_exists:
165+
return JobStatus.running
166+
else:
167+
return JobStatus.terminated
168+
169+
def check_finish_tag(self, job):
170+
job_tag_finished = job.job_hash + '_job_tag_finished'
171+
return self.context.check_file_exists(job_tag_finished)

0 commit comments

Comments
 (0)