Skip to content

Commit a5a3526

Browse files
committed
WIP: Dynamic scheduling
1 parent e1c563a commit a5a3526

File tree

3 files changed

+186
-103
lines changed

3 files changed

+186
-103
lines changed

vunit/test/list.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,6 @@ def __iter__(self):
6464
def __len__(self):
6565
return len(self._test_suites)
6666

67-
def __getitem__(self, idx):
68-
return self._test_suites[idx]
69-
7067

7168
class TestSuiteWrapper(object):
7269
"""

vunit/test/runner.py

Lines changed: 152 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ def __init__( # pylint: disable=too-many-arguments
4545
fail_fast=False,
4646
dont_catch_exceptions=False,
4747
no_color=False,
48+
latest_dependency_updates={},
49+
test_suite_history={},
4850
):
4951
self._lock = threading.Lock()
5052
self._fail_fast = fail_fast
@@ -64,6 +66,8 @@ def __init__( # pylint: disable=too-many-arguments
6466
self._stderr = sys.stderr
6567
self._dont_catch_exceptions = dont_catch_exceptions
6668
self._no_color = no_color
69+
self._latest_dependency_updates = latest_dependency_updates
70+
self._test_suite_history = test_suite_history
6771

6872
ostools.PROGRAM_STATUS.reset()
6973

@@ -79,7 +83,6 @@ def run(self, test_suites):
7983
"""
8084
Run a list of test suites
8185
"""
82-
8386
if not Path(self._output_path).exists():
8487
os.makedirs(self._output_path)
8588

@@ -98,7 +101,9 @@ def run(self, test_suites):
98101

99102
self._report.set_expected_num_tests(num_tests)
100103

101-
scheduler = TestScheduler(test_suites)
104+
scheduler = TestScheduler(
105+
test_suites, self._num_threads, self._latest_dependency_updates, self._test_suite_history
106+
)
102107

103108
threads = []
104109

@@ -110,16 +115,16 @@ def run(self, test_suites):
110115
sys.stderr = ThreadLocalOutput(self._local, self._stdout)
111116

112117
# Start P-1 worker threads
113-
for _ in range(self._num_threads - 1):
118+
for thread_id in range(1, self._num_threads):
114119
new_thread = threading.Thread(
115120
target=self._run_thread,
116-
args=(write_stdout, scheduler, num_tests, False),
121+
args=(write_stdout, scheduler, num_tests, False, thread_id),
117122
)
118123
threads.append(new_thread)
119124
new_thread.start()
120125

121126
# Run one worker in main thread such that P=1 is not multithreaded
122-
self._run_thread(write_stdout, scheduler, num_tests, True)
127+
self._run_thread(write_stdout, scheduler, num_tests, True, 0)
123128

124129
scheduler.wait_for_finish()
125130

@@ -136,7 +141,7 @@ def run(self, test_suites):
136141
sys.stderr = self._stderr
137142
LOGGER.debug("TestRunner: Leaving")
138143

139-
def _run_thread(self, write_stdout, scheduler, num_tests, is_main):
144+
def _run_thread(self, write_stdout, scheduler, num_tests, is_main, thread_id):
140145
"""
141146
Run worker thread
142147
"""
@@ -145,7 +150,7 @@ def _run_thread(self, write_stdout, scheduler, num_tests, is_main):
145150
while True:
146151
test_suite = None
147152
try:
148-
test_suite = scheduler.next()
153+
test_suite = scheduler.next(thread_id)
149154

150155
output_path = self._get_output_path(test_suite.name)
151156
output_file_name = str(Path(output_path) / "output.txt")
@@ -171,7 +176,7 @@ def _run_thread(self, write_stdout, scheduler, num_tests, is_main):
171176

172177
finally:
173178
if test_suite is not None:
174-
scheduler.test_done()
179+
scheduler.test_done(thread_id)
175180

176181
def _get_output_path(self, test_suite_name):
177182
"""
@@ -403,35 +408,163 @@ class TestScheduler(object):
403408
Schedule tests to different treads
404409
"""
405410

406-
def __init__(self, tests):
411+
def _create_test_suite_sets(self, test_suites):
412+
"""
413+
Create static priority based on test result and file change history.
414+
"""
415+
416+
# Test suites are divided into sets which are executed in order. The internal order within a set
417+
# is decided dynamically at run-time.
418+
#
419+
# The 5 sets contains:
420+
#
421+
# 0. Test suites that failed before and for which no updates have been made. Expected to fail again.
422+
# 1. Test suites that failed before but updates have been made that can change that.
423+
# 2. Test suites without a history. New tests are more likely to fail and should be executed early.
424+
# 3. Test suites that passed before but depends on updates. There is a risk that we've introduced new bugs.
425+
# 4. Test suites that passed before and for which there are no updates. They are expected to pass again.
426+
#
427+
# Within sets, test suites are sorted in execution time order starting with the fastest test.
428+
# This is in preparation for the dynamic scheduling that decides the final order within a set.
429+
# The exception is set 2 which has no history of execution time.
430+
431+
# A test suite set keeps the sorted test suite list as well as the total estimated execution time (if available)
432+
# for the test suites within the list.
433+
test_suite_sets = []
434+
for idx in range(5):
435+
test_suite_sets.append(dict(test_suites=[], total_exec_time=None if idx == 2 else 0))
436+
437+
for test_suite in test_suites:
438+
test_suite_data = self._test_suite_history.get(test_suite.name, False)
439+
if not test_suite_data:
440+
test_suite_sets[2]["test_suites"].append(dict(test_suite=test_suite, exec_time=None))
441+
else:
442+
# Test suites with multiple tests are placed in the set where the highest priority test belongs
443+
highest_priority_set = None
444+
exec_time = 0
445+
for test_name in test_suite.test_names:
446+
test_data = test_suite_data.get(test_name, False)
447+
exec_time += test_data["total_time"]
448+
if not test_data:
449+
set_idx = 2
450+
else:
451+
updated_dependency = (
452+
self._latest_dependency_updates[test_suite.file_name] > test_data["start_time"]
453+
)
454+
if test_data["failed"]:
455+
if not updated_dependency:
456+
set_idx = 0
457+
else:
458+
set_idx = 1
459+
elif test_data["skipped"]:
460+
set_idx = 2
461+
elif updated_dependency:
462+
set_idx = 3
463+
else:
464+
set_idx = 4
465+
466+
highest_priority_set = set_idx if not highest_priority_set else min(highest_priority_set, set_idx)
467+
468+
test_suite_sets[highest_priority_set]["test_suites"].append(
469+
dict(test_suite=test_suite, exec_time=exec_time)
470+
)
471+
test_suite_sets[highest_priority_set]["total_exec_time"] += exec_time
472+
473+
for idx, test_suite_set in enumerate(test_suite_sets):
474+
if idx == 2:
475+
continue
476+
test_suite_set["test_suites"].sort(key=lambda item: item["exec_time"])
477+
478+
return test_suite_sets
479+
480+
def __init__(self, test_suites, num_threads, latest_dependency_updates, test_suite_history):
481+
self._num_threads = num_threads
482+
self._latest_dependency_updates = latest_dependency_updates
483+
self._test_suite_history = test_suite_history
484+
self._test_suite_sets = self._create_test_suite_sets(test_suites)
407485
self._lock = threading.Lock()
408-
self._tests = tests
409-
self._idx = 0
486+
self._num_tests = sum(len(test_suite_set["test_suites"]) for test_suite_set in self._test_suite_sets)
410487
self._num_done = 0
488+
self._thread_status = [dict(start_time=None, exec_time=None) for _ in range(num_threads)]
411489

412-
def next(self):
490+
# Estimate remaing test time
491+
self._exec_time_for_remaining_tests = 0
492+
for test_suite_set in self._test_suite_sets:
493+
if total_exec_time := test_suite_set["total_exec_time"]:
494+
self._exec_time_for_remaining_tests += total_exec_time
495+
496+
def next(self, thread_id):
413497
"""
414498
Return the next test
415499
"""
416500
ostools.PROGRAM_STATUS.check_for_shutdown()
417501
with self._lock: # pylint: disable=not-context-manager
418-
if self._idx < len(self._tests):
419-
idx = self._idx
420-
self._idx += 1
421-
return self._tests[idx]
502+
# Get the first non-empty test suite set or raise StopIteration
503+
test_suite_set = next(
504+
(test_suite_set for test_suite_set in self._test_suite_sets if test_suite_set["test_suites"])
505+
)
506+
507+
# Estimate time to completion if we can achieve perfect load-balancing of threads
508+
remaining_exec_time_for_ongoing_tests = 0
509+
for idx, status in enumerate(self._thread_status):
510+
if status["start_time"] and status["exec_time"]:
511+
remaining_exec_time_for_ongoing_tests += max(
512+
0, status["start_time"] + status["exec_time"] - time.time()
513+
)
514+
515+
time_to_completion = (
516+
remaining_exec_time_for_ongoing_tests + self._exec_time_for_remaining_tests
517+
) / self._num_threads
518+
519+
# Estimate when next thread will complete
520+
time_to_next_thread_completion = None
521+
for idx, status in enumerate(self._thread_status):
522+
if idx != thread_id:
523+
if not status["start_time"]:
524+
time_to_next_thread_completion = 0
525+
526+
elif status["start_time"] and status["exec_time"]:
527+
time_to_thread_completion = max(0, status["start_time"] + status["exec_time"] - time.time())
528+
time_to_next_thread_completion = (
529+
time_to_thread_completion
530+
if time_to_next_thread_completion is None
531+
else min(time_to_next_thread_completion, time_to_thread_completion)
532+
)
533+
534+
# Select the longest test if delaying it would risk exceeding the ideal time to completion.
535+
test_suites = test_suite_set["test_suites"]
536+
longest_test_exec_time = test_suites[-1]["exec_time"]
537+
if (longest_test_exec_time is not None) and (time_to_completion <= longest_test_exec_time):
538+
test_suite_data = test_suites.pop(-1)
539+
elif (
540+
(longest_test_exec_time is not None)
541+
and (time_to_next_thread_completion is not None)
542+
and (time_to_completion - time_to_next_thread_completion <= longest_test_exec_time)
543+
):
544+
test_suite_data = test_suites.pop(-1)
545+
else:
546+
test_suite_data = test_suites.pop(0)
547+
548+
if exec_time := test_suite_data["exec_time"]:
549+
self._exec_time_for_remaining_tests -= exec_time
550+
551+
self._thread_status[thread_id]["exec_time"] = exec_time
552+
self._thread_status[thread_id]["start_time"] = time.time()
422553

423-
raise StopIteration
554+
return test_suite_data["test_suite"]
424555

425-
def test_done(self):
556+
def test_done(self, thread_id):
426557
"""
427558
Signal that a test has been done
428559
"""
429560
with self._lock: # pylint: disable=not-context-manager
561+
self._thread_status[thread_id]["start_time"] = None
562+
self._thread_status[thread_id]["exec_time"] = None
430563
self._num_done += 1
431564

432565
def is_finished(self):
433566
with self._lock: # pylint: disable=not-context-manager
434-
return self._num_done >= len(self._tests)
567+
return self._num_done >= self._num_tests
435568

436569
def wait_for_finish(self):
437570
"""

0 commit comments

Comments
 (0)