Skip to content

Commit 14089fb

Browse files
committed
WIP: Dynamic scheduling
1 parent e1c563a commit 14089fb

File tree

3 files changed

+166
-103
lines changed

3 files changed

+166
-103
lines changed

vunit/test/list.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,6 @@ def __iter__(self):
6464
def __len__(self):
6565
return len(self._test_suites)
6666

67-
def __getitem__(self, idx):
68-
return self._test_suites[idx]
69-
7067

7168
class TestSuiteWrapper(object):
7269
"""

vunit/test/runner.py

Lines changed: 132 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ def __init__( # pylint: disable=too-many-arguments
4545
fail_fast=False,
4646
dont_catch_exceptions=False,
4747
no_color=False,
48+
latest_dependency_updates={},
49+
test_suite_history={},
4850
):
4951
self._lock = threading.Lock()
5052
self._fail_fast = fail_fast
@@ -64,6 +66,8 @@ def __init__( # pylint: disable=too-many-arguments
6466
self._stderr = sys.stderr
6567
self._dont_catch_exceptions = dont_catch_exceptions
6668
self._no_color = no_color
69+
self._latest_dependency_updates = latest_dependency_updates
70+
self._test_suite_history = test_suite_history
6771

6872
ostools.PROGRAM_STATUS.reset()
6973

@@ -79,7 +83,6 @@ def run(self, test_suites):
7983
"""
8084
Run a list of test suites
8185
"""
82-
8386
if not Path(self._output_path).exists():
8487
os.makedirs(self._output_path)
8588

@@ -98,7 +101,9 @@ def run(self, test_suites):
98101

99102
self._report.set_expected_num_tests(num_tests)
100103

101-
scheduler = TestScheduler(test_suites)
104+
scheduler = TestScheduler(
105+
test_suites, self._num_threads, self._latest_dependency_updates, self._test_suite_history
106+
)
102107

103108
threads = []
104109

@@ -110,16 +115,16 @@ def run(self, test_suites):
110115
sys.stderr = ThreadLocalOutput(self._local, self._stdout)
111116

112117
# Start P-1 worker threads
113-
for _ in range(self._num_threads - 1):
118+
for thread_id in range(1, self._num_threads):
114119
new_thread = threading.Thread(
115120
target=self._run_thread,
116-
args=(write_stdout, scheduler, num_tests, False),
121+
args=(write_stdout, scheduler, num_tests, False, thread_id),
117122
)
118123
threads.append(new_thread)
119124
new_thread.start()
120125

121126
# Run one worker in main thread such that P=1 is not multithreaded
122-
self._run_thread(write_stdout, scheduler, num_tests, True)
127+
self._run_thread(write_stdout, scheduler, num_tests, True, 0)
123128

124129
scheduler.wait_for_finish()
125130

@@ -136,7 +141,7 @@ def run(self, test_suites):
136141
sys.stderr = self._stderr
137142
LOGGER.debug("TestRunner: Leaving")
138143

139-
def _run_thread(self, write_stdout, scheduler, num_tests, is_main):
144+
def _run_thread(self, write_stdout, scheduler, num_tests, is_main, thread_id):
140145
"""
141146
Run worker thread
142147
"""
@@ -145,7 +150,7 @@ def _run_thread(self, write_stdout, scheduler, num_tests, is_main):
145150
while True:
146151
test_suite = None
147152
try:
148-
test_suite = scheduler.next()
153+
test_suite = scheduler.next(thread_id)
149154

150155
output_path = self._get_output_path(test_suite.name)
151156
output_file_name = str(Path(output_path) / "output.txt")
@@ -171,7 +176,7 @@ def _run_thread(self, write_stdout, scheduler, num_tests, is_main):
171176

172177
finally:
173178
if test_suite is not None:
174-
scheduler.test_done()
179+
scheduler.test_done(thread_id)
175180

176181
def _get_output_path(self, test_suite_name):
177182
"""
@@ -403,35 +408,143 @@ class TestScheduler(object):
403408
Schedule tests to different treads
404409
"""
405410

406-
def __init__(self, tests):
411+
def _create_test_suite_sets(self, test_suites):
412+
"""
413+
Create static priority based on test result and file change history.
414+
"""
415+
416+
# Test suites are divided into sets which are executed in order. The internal order within a set
417+
# is decided dynamically at run-time.
418+
#
419+
# The 5 sets contains:
420+
#
421+
# 0. Test suites that failed before and for which no updates have been made. Expected to fail again.
422+
# 1. Test suites that failed before but updates have been made that can change that.
423+
# 2. Test suites without a history. New tests are more likely to fail and should be run early.
424+
# 3. Test suites that passed before but depends on updates. There is a risk that we've introduced new bugs.
425+
# 4. Test suites that passed before and for which there are no updates. They are expected to pass again.
426+
#
427+
# Within sets, test suites are sorted in execution time order starting with the fastest test.
428+
# This is in preparation for the dynamic scheduling that decides the final order within a set.
429+
# The exception is set 2 which has no history of execution time.
430+
431+
# A test suite set keeps the sorted test suite list as well as the total estimated execution time (if available)
432+
# for the test suites within the list.
433+
test_suite_sets = []
434+
for idx in range(5):
435+
test_suite_sets.append(dict(test_suites=[], total_exec_time=None if idx == 2 else 0))
436+
437+
for test_suite in test_suites:
438+
test_suite_data = self._test_suite_history.get(test_suite.name, False)
439+
if not test_suite_data:
440+
test_suite_sets[2]["test_suites"].append(dict(test_suite=test_suite, exec_time=None))
441+
else:
442+
highest_priority_set = None
443+
exec_time = 0
444+
for test_name in test_suite.test_names:
445+
test_data = test_suite_data.get(test_name, False)
446+
exec_time += test_data["total_time"]
447+
if not test_data:
448+
set_idx = 2
449+
else:
450+
# Test suites with multiple tests are placed in the set where the highest priority test belongs
451+
updated_dependency = (
452+
self._latest_dependency_updates[test_suite.file_name] > test_data["start_time"]
453+
)
454+
if test_data["failed"]:
455+
if not updated_dependency:
456+
set_idx = 0
457+
else:
458+
set_idx = 1
459+
elif test_data["skipped"]:
460+
set_idx = 2
461+
elif updated_dependency:
462+
set_idx = 3
463+
else:
464+
set_idx = 4
465+
466+
highest_priority_set = set_idx if not highest_priority_set else min(highest_priority_set, set_idx)
467+
468+
test_suite_sets[highest_priority_set]["test_suites"].append(
469+
dict(test_suite=test_suite, exec_time=exec_time)
470+
)
471+
test_suite_sets[highest_priority_set]["total_exec_time"] += exec_time
472+
473+
for idx, test_suite_set in enumerate(test_suite_sets):
474+
if idx == 2:
475+
continue
476+
test_suite_set["test_suites"].sort(key=lambda item: item["exec_time"])
477+
478+
return test_suite_sets
479+
480+
def __init__(self, test_suites, num_threads, latest_dependency_updates, test_suite_history):
481+
self._num_threads = num_threads
482+
self._latest_dependency_updates = latest_dependency_updates
483+
self._test_suite_history = test_suite_history
484+
self._test_suite_sets = self._create_test_suite_sets(test_suites)
407485
self._lock = threading.Lock()
408-
self._tests = tests
409-
self._idx = 0
486+
self._num_tests = sum(len(test_suite_set["test_suites"]) for test_suite_set in self._test_suite_sets)
410487
self._num_done = 0
488+
self._thread_status = [dict(start_time=None, exec_time=None) for _ in range(num_threads)]
411489

412-
def next(self):
490+
# Estimate remaing test time
491+
self._exec_time_for_remaining_tests = 0
492+
for test_suite_set in self._test_suite_sets:
493+
if total_exec_time := test_suite_set["total_exec_time"]:
494+
self._exec_time_for_remaining_tests += total_exec_time
495+
496+
def next(self, thread_id):
413497
"""
414498
Return the next test
415499
"""
416500
ostools.PROGRAM_STATUS.check_for_shutdown()
417501
with self._lock: # pylint: disable=not-context-manager
418-
if self._idx < len(self._tests):
419-
idx = self._idx
420-
self._idx += 1
421-
return self._tests[idx]
502+
# Get the first non-empty test suite set or raise StopIteration
503+
test_suite_set = next(
504+
(test_suite_set for test_suite_set in self._test_suite_sets if test_suite_set["test_suites"])
505+
)
506+
507+
# Estimate time to completion if we can achieve perfect load-balancing of threads
508+
remaining_exec_time_for_ongoing_tests = 0
509+
for status in self._thread_status:
510+
if status["start_time"] and status["exec_time"]:
511+
remaining_exec_time_for_ongoing_tests += min(
512+
0, status["start_time"] + status["exec_time"] - time.time()
513+
)
514+
515+
time_to_completion = (
516+
remaining_exec_time_for_ongoing_tests + self._exec_time_for_remaining_tests
517+
) / self._num_threads
518+
519+
# Select the longest test if delaying it would risk exceeding the ideal time to completion.
520+
# Add some margin to compensate for small variations in actual execution time.
521+
test_suites = test_suite_set["test_suites"]
522+
longest_test_exec_time = test_suites[-1]["exec_time"]
523+
if longest_test_exec_time is not None and longest_test_exec_time >= 0.9 * time_to_completion:
524+
test_suite_data = test_suites.pop(-1)
525+
else:
526+
test_suite_data = test_suites.pop(0)
527+
528+
if exec_time := test_suite_data["exec_time"]:
529+
self._exec_time_for_remaining_tests -= exec_time
530+
531+
self._thread_status[thread_id]["exec_time"] = exec_time
532+
self._thread_status[thread_id]["start_time"] = time.time()
422533

423-
raise StopIteration
534+
return test_suite_data["test_suite"]
424535

425-
def test_done(self):
536+
def test_done(self, thread_id):
426537
"""
427538
Signal that a test has been done
428539
"""
429540
with self._lock: # pylint: disable=not-context-manager
541+
self._thread_status[thread_id]["start_time"] = None
542+
self._thread_status[thread_id]["exec_time"] = None
430543
self._num_done += 1
431544

432545
def is_finished(self):
433546
with self._lock: # pylint: disable=not-context-manager
434-
return self._num_done >= len(self._tests)
547+
return self._num_done >= self._num_tests
435548

436549
def wait_for_finish(self):
437550
"""

0 commit comments

Comments
 (0)