Use Base.Semaphore to control test execution parallelism#119
Use Base.Semaphore to control test execution parallelism#119
Conversation
Replace the fixed worker-task-per-slot model with a semaphore-based approach: one task per test, with a Base.Semaphore(jobs) limiting concurrency and a Channel-based worker pool for reuse. This decouples the number of tasks from the parallelism level and simplifies the control flow (no inner while loop, tests array is immutable). Co-authored-by: Claude <noreply@anthropic.com> Made-with: Cursor
src/ParallelTestRunner.jl
Outdated
| for p in workers | ||
| tests_to_start = Threads.Atomic{Int}(length(tests)) | ||
| for test in tests |
There was a problem hiding this comment.
The core change I was interested in was changing this for loop from an iteration over the workers, to an iteration over the tests: the idea for #77 is to have different semaphores for different subsets of tests, and here we iterate over the different subsets (and associated semaphores). The whole execution cycle would then require only this change on this single line, the rest would remain as is.
I still need to fully work out the user-facing changes for designating the serial tests for #77. I have some bad ideas in mind, but I want to decouple them from this refactoring, which I believe is still worth on its own right for making the code easier to follow.
|
Uhm, the test failure is interesting:
The error disappeared after a rerun, I'd say it was a glitch? 😬 Edit: tests on all platforms always passed in the following 3 full reruns. |
|
I pushed a change to use a As a standalone demo of my idea for running the serial tests, followed by the fully concurrent tests, start Julia with 4 threads and run the following code: @time "outer loop" for (tests, semaphore) in ((1:4, Base.Semaphore(1)), (5:12, Base.Semaphore(4)))
@info "running batch" tests semaphore
@time "inner loop - semaphore size $(semaphore.sem_size)" @sync for test in tests
Threads.@spawn Base.acquire(semaphore) do
@show test
sleep(1)
end
end
endYou should observe something like The first batch of "tests" (1 to 4), associated to the semaphore of size 1, is run serially, and takes 4 seconds, the second batch of "tests" (5 to 12), associated to the semaphore of size 4, runs concurrently and takes 2 seconds, for a total of 6 seconds for the overall "test suite". With this design, we could even have multiple semaphores of intermediate sizes between Edit: as a complete demo of my proposal for #77, with the following hacky change diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl
index 31f77f9..106c428 100644
--- a/src/ParallelTestRunner.jl
+++ b/src/ParallelTestRunner.jl
@@ -826,11 +826,6 @@ function runtests(mod::Module, args::ParsedArgs;
jobs = clamp(jobs, 1, length(tests))
println(stdout, "Running $(length(tests)) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.")
!isnothing(args.verbose) && println(stdout, "Available memory: $(Base.format_bytes(available_memory()))")
- sem = Base.Semaphore(max(1, jobs))
- worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs)
- for _ in 1:jobs
- put!(worker_pool, nothing)
- end
t0 = time()
results = []
@@ -1022,9 +1017,15 @@ function runtests(mod::Module, args::ParsedArgs;
#
# execution
#
+ worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs)
+ for _ in 1:jobs
+ put!(worker_pool, nothing)
+ end
tests_to_start = Threads.Atomic{Int}(length(tests))
- @sync for test in tests
+ tests_semaphores = ((tests[1:4], Base.Semaphore(1)), (tests[5:end], Base.Semaphore(max(1, jobs))))
+ for (batch, sem) in tests_semaphores
+ @sync for test in batch
push!(worker_tasks, Threads.@spawn begin
local p = nothing
acquired = false
@@ -1123,6 +1124,7 @@ function runtests(mod::Module, args::ParsedArgs;
end
end)
end
+ end
#
# finalizationrun using ParallelTestRunner
testsuite = Dict(string(l) => :(sleep(1)) for l in 'a':'l');
runtests(ParallelTestRunner, ["--verbose", "--jobs=4"]; testsuite);you should get First batch of 4 tests were run serially, the rest of the tests was run concurrently. This also shows that the workers are still recycled correctly. |
This is a refactoring of the code, to then enable #77 in a follow up PR (CC @christiangnrd).
The idea of using a semaphore is mine, initial implementation is from Claude, but I made a few manual refinements afterwards (some of them folded in the first commit). Overall summary of the changes:
I'm mostly happy about the result, it's very close to what I had in mind, and the net diff is relatively small (+48/-22), so this shouldn't be too hard to review.
#118 was useful because it detected that the case of no tests to run wasn't handled correctly, so yay for the extra tests.