Skip to content

Commit 827faaf

Browse files
committed
WIP: Dynamic scheduling
1 parent e1c563a commit 827faaf

File tree

3 files changed

+161
-101
lines changed

3 files changed

+161
-101
lines changed

vunit/test/list.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,6 @@ def __iter__(self):
6464
def __len__(self):
6565
return len(self._test_suites)
6666

67-
def __getitem__(self, idx):
68-
return self._test_suites[idx]
69-
7067

7168
class TestSuiteWrapper(object):
7269
"""

vunit/test/runner.py

Lines changed: 130 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ def __init__( # pylint: disable=too-many-arguments
4545
fail_fast=False,
4646
dont_catch_exceptions=False,
4747
no_color=False,
48+
latest_dependency_updates={},
49+
test_suite_history={},
4850
):
4951
self._lock = threading.Lock()
5052
self._fail_fast = fail_fast
@@ -64,6 +66,8 @@ def __init__( # pylint: disable=too-many-arguments
6466
self._stderr = sys.stderr
6567
self._dont_catch_exceptions = dont_catch_exceptions
6668
self._no_color = no_color
69+
self._latest_dependency_updates = latest_dependency_updates
70+
self._test_suite_history = test_suite_history
6771

6872
ostools.PROGRAM_STATUS.reset()
6973

@@ -75,11 +79,69 @@ def _is_verbose(self):
7579
def _is_quiet(self):
7680
return self._verbosity == self.VERBOSITY_QUIET
7781

82+
def _create_test_suite_sets(self, test_suites):
83+
# Priority is set to fail fast:
84+
# 1. Tests that failed before and for which no updates have been made. Expected to fail again. Shortest # test first (STF).
85+
# 2. Tests that failed before but updates have been made that can change that. STF.
86+
# 3. Tests without a history. New tests are more likely to fail and should be run early. Without
87+
# any execution time order, there is no further sorting within this group
88+
# 4. Tests that passed before but depends on updates. There is a risk that we've introduced new bugs. STF.
89+
# 5. Tests that passed before and for which there are no updates. They are expected to pass again.
90+
# Longest test first to optimize completion time when running with multiple threads
91+
#
92+
# Each test is given a priority number where the integer part is according to the priority above and
93+
# the decimal part seperates the tests within the group based on execution time.
94+
95+
test_suite_sets = []
96+
for idx in range(5):
97+
test_suite_sets.append(dict(test_suites=[], total_exec_time=None if idx == 2 else 0))
98+
99+
for test_suite in test_suites:
100+
test_suite_data = self._test_suite_history.get(test_suite.name, False)
101+
if not test_suite_data:
102+
test_suite_sets[2]["test_suites"].append(dict(test_suite=test_suite, exec_time=None))
103+
else:
104+
highest_priority = None
105+
exec_time = 0
106+
for test_name in test_suite.test_names:
107+
test_data = test_suite_data.get(test_name, False)
108+
exec_time += test_data["total_time"]
109+
if not test_data:
110+
priority = 3
111+
else:
112+
updated_dependency = (
113+
self._latest_dependency_updates[test_suite.file_name] > test_data["start_time"]
114+
)
115+
if test_data["failed"]:
116+
if not updated_dependency:
117+
priority = 1
118+
else:
119+
priority = 2
120+
elif test_data["skipped"]:
121+
priority = 3
122+
elif updated_dependency:
123+
priority = 4
124+
else:
125+
priority = 5
126+
127+
highest_priority = priority if not highest_priority else min(highest_priority, priority)
128+
129+
test_suite_sets[highest_priority - 1]["test_suites"].append(
130+
dict(test_suite=test_suite, exec_time=exec_time)
131+
)
132+
test_suite_sets[highest_priority - 1]["total_exec_time"] += exec_time
133+
134+
for idx, test_suite_set in enumerate(test_suite_sets):
135+
if idx == 2:
136+
continue
137+
test_suite_set["test_suites"].sort(key=lambda item: item["exec_time"])
138+
139+
return test_suite_sets
140+
78141
def run(self, test_suites):
79142
"""
80143
Run a list of test suites
81144
"""
82-
83145
if not Path(self._output_path).exists():
84146
os.makedirs(self._output_path)
85147

@@ -98,7 +160,9 @@ def run(self, test_suites):
98160

99161
self._report.set_expected_num_tests(num_tests)
100162

101-
scheduler = TestScheduler(test_suites)
163+
test_suite_sets = self._create_test_suite_sets(test_suites)
164+
165+
scheduler = TestScheduler(test_suite_sets, self._num_threads)
102166

103167
threads = []
104168

@@ -110,16 +174,16 @@ def run(self, test_suites):
110174
sys.stderr = ThreadLocalOutput(self._local, self._stdout)
111175

112176
# Start P-1 worker threads
113-
for _ in range(self._num_threads - 1):
177+
for thread_id in range(1, self._num_threads):
114178
new_thread = threading.Thread(
115179
target=self._run_thread,
116-
args=(write_stdout, scheduler, num_tests, False),
180+
args=(write_stdout, scheduler, num_tests, False, thread_id),
117181
)
118182
threads.append(new_thread)
119183
new_thread.start()
120184

121185
# Run one worker in main thread such that P=1 is not multithreaded
122-
self._run_thread(write_stdout, scheduler, num_tests, True)
186+
self._run_thread(write_stdout, scheduler, num_tests, True, 0)
123187

124188
scheduler.wait_for_finish()
125189

@@ -136,7 +200,7 @@ def run(self, test_suites):
136200
sys.stderr = self._stderr
137201
LOGGER.debug("TestRunner: Leaving")
138202

139-
def _run_thread(self, write_stdout, scheduler, num_tests, is_main):
203+
def _run_thread(self, write_stdout, scheduler, num_tests, is_main, thread_id):
140204
"""
141205
Run worker thread
142206
"""
@@ -145,7 +209,7 @@ def _run_thread(self, write_stdout, scheduler, num_tests, is_main):
145209
while True:
146210
test_suite = None
147211
try:
148-
test_suite = scheduler.next()
212+
test_suite = scheduler.next(thread_id)
149213

150214
output_path = self._get_output_path(test_suite.name)
151215
output_file_name = str(Path(output_path) / "output.txt")
@@ -171,7 +235,7 @@ def _run_thread(self, write_stdout, scheduler, num_tests, is_main):
171235

172236
finally:
173237
if test_suite is not None:
174-
scheduler.test_done()
238+
scheduler.test_done(thread_id)
175239

176240
def _get_output_path(self, test_suite_name):
177241
"""
@@ -398,40 +462,86 @@ def flush(self):
398462
self._stdout.flush()
399463

400464

465+
class TestIterator:
466+
def __init__(self, test_suite_sets, n_threads):
467+
self._test_suite_sets = test_suite_sets
468+
self._set_idx = 0
469+
self._n_threads = n_threads
470+
self._thread_status = [dict(start_time=None, exec_time=None) for _ in range(n_threads)]
471+
472+
# Estimate remaing test time
473+
self._exec_time_for_remaining_tests = 0
474+
for test_suite_set in test_suite_sets:
475+
if total_exec_time := test_suite_set["total_exec_time"]:
476+
self._exec_time_for_remaining_tests += total_exec_time
477+
478+
def thread_done(self, thread_id):
479+
self._thread_status[thread_id]["start_time"] = None
480+
self._thread_status[thread_id]["exec_time"] = None
481+
482+
def next(self, thread_id):
483+
# Get the first non-empty test suite set or raise StopIteration
484+
test_suite_set = next(
485+
(test_suite_set for test_suite_set in self._test_suite_sets if test_suite_set["test_suites"])
486+
)
487+
488+
# Estimate time to completion if we can achieve perfect load-balancing over threads
489+
remaining_exec_time_for_ongoing_tests = 0
490+
for status in self._thread_status:
491+
if start_time := status["start_time"]:
492+
remaining_exec_time_for_ongoing_tests += min(0, start_time + status["exec_time"] - time.time())
493+
494+
time_to_completion = (
495+
remaining_exec_time_for_ongoing_tests + self._exec_time_for_remaining_tests
496+
) / self._n_threads
497+
498+
# Select the longest test if delaying it would risk exceeding the ideal time to completion
499+
test_suites = test_suite_set["test_suites"]
500+
longest_test_exec_time = test_suites[-1]["exec_time"]
501+
if longest_test_exec_time is not None and longest_test_exec_time >= 0.9 * time_to_completion:
502+
test_suite_data = test_suites.pop(-1)
503+
else:
504+
test_suite_data = test_suites.pop(0)
505+
506+
if exec_time := test_suite_data["exec_time"]:
507+
self._exec_time_for_remaining_tests -= exec_time
508+
509+
self._thread_status[thread_id]["exec_time"] = exec_time
510+
self._thread_status[thread_id]["start_time"] = time.time()
511+
512+
return test_suite_data["test_suite"]
513+
514+
401515
class TestScheduler(object):
402516
"""
403517
Schedule tests to different treads
404518
"""
405519

406-
def __init__(self, tests):
520+
def __init__(self, test_suite_sets, n_threads):
407521
self._lock = threading.Lock()
408-
self._tests = tests
409-
self._idx = 0
522+
self._num_tests = sum(len(test_suite_set["test_suites"]) for test_suite_set in test_suite_sets)
523+
self._test_iterator = TestIterator(test_suite_sets, n_threads)
410524
self._num_done = 0
411525

412-
def next(self):
526+
def next(self, thread_id):
413527
"""
414528
Return the next test
415529
"""
416530
ostools.PROGRAM_STATUS.check_for_shutdown()
417531
with self._lock: # pylint: disable=not-context-manager
418-
if self._idx < len(self._tests):
419-
idx = self._idx
420-
self._idx += 1
421-
return self._tests[idx]
422-
423-
raise StopIteration
532+
return self._test_iterator.next(thread_id)
424533

425-
def test_done(self):
534+
def test_done(self, thread_id):
426535
"""
427536
Signal that a test has been done
428537
"""
429538
with self._lock: # pylint: disable=not-context-manager
539+
self._test_iterator.thread_done(thread_id)
430540
self._num_done += 1
431541

432542
def is_finished(self):
433543
with self._lock: # pylint: disable=not-context-manager
434-
return self._num_done >= len(self._tests)
544+
return self._num_done >= self._num_tests
435545

436546
def wait_for_finish(self):
437547
"""

0 commit comments

Comments
 (0)