-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathbasic.py
93 lines (72 loc) · 3.38 KB
/
basic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""The Basic Scheduler Plugin class. Works under the assumption that you can't a full
node inventory, so Pavilion has to guess (or be told) about node info."""
from abc import ABC
from typing import List
from pavilion.jobs import Job, JobError
from pavilion.status_file import STATES
from pavilion.test_run import TestRun
from pavilion.types import NodeInfo, Nodes, NodeList
from .config import validate_config, calc_node_range
from .scheduler import SchedulerPlugin, SchedulerPluginError
from .vars import SchedulerVariables
class SchedulerPluginBasic(SchedulerPlugin, ABC):
"""A Scheduler plugin that does not support automatic node inventories. It relies
on manually set parameters in 'schedule.cluster_info'."""
def _get_initial_vars(self, sched_config: dict) -> SchedulerVariables:
"""Get the initial variables for the basic scheduler.
"""
return self.VAR_CLASS(
sched_config=sched_config,
node_info=Nodes({}),
chunks=[NodeList([])],
chunk_id=0,
node_list_id=0,
)
def get_final_vars(self, test: TestRun) -> SchedulerVariables:
"""Gather node information from within the allocation."""
raw_sched_config = test.config['schedule']
sched_config = validate_config(raw_sched_config)
alloc_nodes = self._get_alloc_nodes(test.job)
num_nodes = sched_config['nodes']
if isinstance(num_nodes, float):
alloc_nodes = alloc_nodes[:int(len(alloc_nodes)*num_nodes)]
nodes = Nodes({})
for node in alloc_nodes:
nodes[node] = self._get_alloc_node_info(node)
return self.VAR_CLASS(
sched_config=sched_config,
node_info=nodes,
chunks=[NodeList(list(nodes.keys()))],
chunk_id=0,
node_list_id=0,
test_nodes=NodeList(list(nodes.keys())),
deferred=False,
)
def _get_alloc_node_info(self, node_name) -> NodeInfo:
"""Given that this is running on an allocation, get information about
the given node. While this is completely optional, it can help pavilion
better populate variables like 'test_min_cpus' and 'test_min_mem'."""
_ = self, node_name
return NodeInfo({})
def schedule_tests(self, pav_cfg, tests: List[TestRun]):
"""Schedule each test independently."""
for test in tests:
try:
job = Job.new(pav_cfg, [test], self.KICKOFF_FN)
except JobError as err:
raise SchedulerPluginError("Error creating Job: \n{}".format(err))
sched_config = validate_config(test.config['schedule'])
node_range = calc_node_range(sched_config, sched_config['cluster_info']['node_count'])
script = self._create_kickoff_script_stub(
pav_cfg=pav_cfg,
job_name='pav test {} ({})'.format(test.full_id, test.name),
log_path=job.kickoff_log,
sched_config=sched_config,
picked_nodes=node_range,
)
script.command('pav _run {t.working_dir} {t.id}'.format(t=test))
script.write(job.kickoff_path)
job.info = self._kickoff(pav_cfg, job, sched_config)
test.job = job
test.status.set(STATES.SCHEDULED,
"Test kicked off with the {} scheduler".format(self.name))