Skip to content

Commit ec84d6e

Browse files
authored
add cli to handle terminated submission (#413)
Signed-off-by: Jinzhe Zeng <[email protected]> --------- Signed-off-by: Jinzhe Zeng <[email protected]>
1 parent ad83679 commit ec84d6e

File tree

8 files changed

+151
-4
lines changed

8 files changed

+151
-4
lines changed

.github/workflows/test.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@ jobs:
2727
python-version: ${{ matrix.python-version }}
2828
- run: pip install .[test] coverage
2929
- name: Test
30-
run: coverage run --source=./dpdispatcher -m unittest -v && coverage report
30+
run: |
31+
coverage run -p --source=./dpdispatcher -m unittest -v
32+
coverage run -p --source=./dpdispatcher -m dpdispatcher -h
33+
coverage combine
34+
coverage report
3135
- uses: codecov/codecov-action@v3
3236
pass:
3337
needs: [test]

dpdispatcher/__main__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
"""Package dp entry point."""
2+
3+
from dpdispatcher.dpdisp import (
4+
main,
5+
)
6+
7+
if __name__ == "__main__":
8+
main()

dpdispatcher/dpdisp.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import List, Optional
44

55
from dpdispatcher.entrypoints.gui import start_dpgui
6+
from dpdispatcher.entrypoints.submission import handle_submission
67

78

89
def main_parser() -> argparse.ArgumentParser:
@@ -23,6 +24,37 @@ def main_parser() -> argparse.ArgumentParser:
2324
)
2425
subparsers = parser.add_subparsers(title="Valid subcommands", dest="command")
2526
##########################################
27+
# backward
28+
parser_submission = subparsers.add_parser(
29+
"submission",
30+
help="Handle terminated submission.",
31+
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
32+
)
33+
parser_submission.add_argument(
34+
"SUBMISSION_HASH",
35+
type=str,
36+
help="Submission hash to download.",
37+
)
38+
parser_submission_action = parser_submission.add_argument_group(
39+
"Actions",
40+
description="One or more actions to take on submission.",
41+
)
42+
parser_submission_action.add_argument(
43+
"--download-terminated-log",
44+
action="store_true",
45+
help="Download log files of terminated tasks.",
46+
)
47+
parser_submission_action.add_argument(
48+
"--download-finished-task",
49+
action="store_true",
50+
help="Download finished tasks.",
51+
)
52+
parser_submission_action.add_argument(
53+
"--clean",
54+
action="store_true",
55+
help="Clean submission.",
56+
)
57+
##########################################
2658
# gui
2759
parser_gui = subparsers.add_parser(
2860
"gui",
@@ -67,7 +99,14 @@ def parse_args(args: Optional[List[str]] = None):
6799

68100
def main():
69101
args = parse_args()
70-
if args.command == "gui":
102+
if args.command == "submission":
103+
handle_submission(
104+
submission_hash=args.SUBMISSION_HASH,
105+
download_terminated_log=args.download_terminated_log,
106+
download_finished_task=args.download_finished_task,
107+
clean=args.clean,
108+
)
109+
elif args.command == "gui":
71110
start_dpgui(
72111
port=args.port,
73112
bind_all=args.bind_all,
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
from pathlib import Path
2+
3+
from dpdispatcher.dlog import dlog
4+
from dpdispatcher.submission import Submission
5+
from dpdispatcher.utils.job_status import JobStatus
6+
from dpdispatcher.utils.record import record
7+
8+
9+
def handle_submission(
10+
*,
11+
submission_hash: str,
12+
download_terminated_log: bool = False,
13+
download_finished_task: bool = False,
14+
clean: bool = False,
15+
):
16+
"""Handle terminated submission.
17+
18+
Parameters
19+
----------
20+
submission_hash : str
21+
Submission hash to download.
22+
download_terminated_log : bool, optional
23+
Download log files of terminated tasks.
24+
download_finished_task : bool, optional
25+
Download finished tasks.
26+
clean : bool, optional
27+
Clean submission.
28+
29+
Raises
30+
------
31+
ValueError
32+
At least one action should be specified.
33+
"""
34+
if int(download_terminated_log) + int(download_finished_task) + int(clean) == 0:
35+
raise ValueError("At least one action should be specified.")
36+
37+
submission_file = record.get_submission(submission_hash)
38+
submission = Submission.submission_from_json(str(submission_file))
39+
submission.belonging_tasks = [
40+
task for job in submission.belonging_jobs for task in job.job_task_list
41+
]
42+
# TODO: for unclear reason, the submission_hash may be changed
43+
submission.submission_hash = submission_hash
44+
submission.machine.context.bind_submission(submission)
45+
submission.update_submission_state()
46+
47+
terminated_tasks = []
48+
finished_tasks = []
49+
for task in submission.belonging_tasks:
50+
task.get_task_state(submission.machine.context)
51+
if task.task_state == JobStatus.terminated:
52+
terminated_tasks.append(task)
53+
elif task.task_state == JobStatus.finished:
54+
finished_tasks.append(task)
55+
submission.belonging_tasks = []
56+
57+
if download_terminated_log:
58+
for task in terminated_tasks:
59+
task.backward_files = [task.outlog, task.errlog]
60+
submission.belonging_tasks += terminated_tasks
61+
if download_finished_task:
62+
submission.belonging_tasks += finished_tasks
63+
64+
submission.download_jobs()
65+
66+
if download_terminated_log:
67+
terminated_log_files = []
68+
for task in terminated_tasks:
69+
assert submission.local_root is not None
70+
terminated_log_files.append(
71+
Path(submission.local_root) / task.task_work_path / task.outlog
72+
)
73+
terminated_log_files.append(
74+
Path(submission.local_root) / task.task_work_path / task.errlog
75+
)
76+
77+
dlog.info(
78+
"Terminated logs are downloaded into:\n "
79+
+ "\n ".join([str(f) for f in terminated_log_files])
80+
)
81+
82+
if clean:
83+
submission.clean_jobs()

dpdispatcher/submission.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,8 @@ def handle_unexpected_submission_state(self):
364364
f"Debug information: remote_root=={self.machine.context.remote_root}.\n"
365365
f"Debug information: submission_hash=={self.submission_hash}.\n"
366366
f"Please check error messages above and in remote_root. "
367-
f"The submission information is saved in {str(record_path)}."
367+
f"The submission information is saved in {str(record_path)}.\n"
368+
f"For furthur actions, run the following command with proper flags: dpdisp submission {self.submission_hash}"
368369
) from e
369370

370371
def check_ratio_unfinished(self, ratio_unfinished: float) -> bool:

tests/context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from dpdispatcher.contexts.lazy_local_context import LazyLocalContext # noqa: F401
1212
from dpdispatcher.contexts.local_context import LocalContext # noqa: F401
1313
from dpdispatcher.contexts.ssh_context import SSHContext, SSHSession # noqa: F401
14+
from dpdispatcher.entrypoints.submission import handle_submission # noqa: F401
1415
from dpdispatcher.machine import Machine # noqa: F401
1516
from dpdispatcher.machines.distributed_shell import DistributedShell # noqa: F401
1617
from dpdispatcher.machines.dp_cloud_server import Lebesgue # noqa: F401

tests/test_cli.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,8 @@
55
class TestCLI(unittest.TestCase):
66
def test_cli(self):
77
sp.check_output(["dpdisp", "-h"])
8-
for subcommand in ("gui",):
8+
for subcommand in (
9+
"submission",
10+
"gui",
11+
):
912
sp.check_output(["dpdisp", subcommand, "-h"])

tests/test_run_submission.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
Resources,
1515
Submission,
1616
Task,
17+
handle_submission,
1718
record,
1819
setUpModule, # noqa: F401
1920
)
@@ -134,6 +135,13 @@ def test_failed_submission(self):
134135
if sys.platform == "linux":
135136
self.assertTrue(err_msg in traceback.format_exc())
136137
self.assertTrue(record.get_submission(submission.submission_hash).is_file())
138+
# post processing
139+
handle_submission(
140+
submission_hash=submission.submission_hash,
141+
download_finished_task=True,
142+
download_terminated_log=True,
143+
clean=True,
144+
)
137145

138146
def test_async_run_submission(self):
139147
machine = Machine.load_from_dict(self.machine_dict)

0 commit comments

Comments
 (0)