diff --git a/experimental/Parallel/README.md b/experimental/Parallel/README.md new file mode 100644 index 000000000000..8967e88fbca0 --- /dev/null +++ b/experimental/Parallel/README.md @@ -0,0 +1,16 @@ +# Parallel, resp. distributed computations in Oscar + +## Provide simple tools to deploy embarrassingly parallel tasks to workers + +The standard julia methods for parallelization do not naturally go well together with +Oscar datatypes. The main reason is that the `parent` structure needs to be maintained +when communicating between different processes. In this experimental package we aim +to providing support for these things in a hopefully convenient way for the user and +the programmer. + +## Status + +This is currently work in progress with no guarantee for long term support. +However, if you would like to use it or should you find out that the methods +provided here do not fully serve your needs, feel free to reach out to +`@antonydellavecchia` and `@HechtiDerLachs`. diff --git a/experimental/Parallel/docs/doc.main b/experimental/Parallel/docs/doc.main new file mode 100644 index 000000000000..935144b9b1cd --- /dev/null +++ b/experimental/Parallel/docs/doc.main @@ -0,0 +1,5 @@ +[ + "Parallel" => [ + "introduction.md" + ] +] diff --git a/experimental/Parallel/docs/src/introduction.md b/experimental/Parallel/docs/src/introduction.md new file mode 100644 index 000000000000..23d05f937072 --- /dev/null +++ b/experimental/Parallel/docs/src/introduction.md @@ -0,0 +1,32 @@ +```@meta +CurrentModule = Oscar +DocTestSetup = Oscar.doctestsetup() +``` + +# Parallelization in Oscar + +The functionality in the experimental package `Parallel` provides tools to do simple things +in parallel in Oscar. The main obstruction +to using regular functions like `pmap` and friends is that most of the Oscar datatypes +have `parent`s which need to be coherent throughout the worker pool and the main process. +In order to achieve this, some more information on "type parameters" needs to be send +up front when communicating with other processes; see the documentation of the +[Oscar serialization](@ref dev_serialization) for details. + +Below we document specialized structs and methods which we overload so as to hopefully +have a seamless integration of Oscar data types into `pmap` and other functions. +Note that this is non-exhaustive work in progress and the user always remains +responsible for serializing any type of object that they want to pass to workers! + +## `OscarWorkerPool`, `pmap`, and friends +```@docs +OscarWorkerPool +oscar_worker_pool +``` + +## Slightly advanced distributed computing with centralized bookkeeping +```@docs +compute_distributed! +pop_task! +process_result! +``` diff --git a/experimental/Parallel/src/Parallel.jl b/experimental/Parallel/src/Parallel.jl new file mode 100644 index 000000000000..6a6ab6851078 --- /dev/null +++ b/experimental/Parallel/src/Parallel.jl @@ -0,0 +1,272 @@ +@doc raw""" + OscarWorkerPool + +An `OscarWorkerPool` is used to handle a pool of separate worker processes +running Oscar. The messaging between processes in an `OscarWorkerPool` +uses the Oscar serialization which is needed to implement parallel methods. +The julia [Distributed](https://docs.julialang.org/en/v1/stdlib/Distributed/) +functions [`remotecall`](https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.remotecall-Tuple{Any,%20AbstractWorkerPool,%20Vararg{Any}}), +[remotecall_fetch](https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.remotecall_fetch-Tuple{Any,%20AbstractWorkerPool,%20Vararg{Any}}) +and [`pmap`](https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.pmap) will work with an `OscarWorkerPool` which is a subtype of +[`AbstractWorkerPool`](https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.AbstractWorkerPool). +""" +mutable struct OscarWorkerPool <: AbstractWorkerPool + wp::WorkerPool # the plain worker pool + channel::Channel{Int} # required for the `AbstractWorkerPool` interface + workers::Set{Int} # same + wids::Vector{Int} # a list of the ids of all workers which are associated to this pool + oscar_channels::Dict{Int, <:RemoteChannel} # channels for sending `type_params` to the workers + + function OscarWorkerPool(n::Int) + wids = addprocs(n) + wp = WorkerPool(wids) + # @everywhere can only be used on top-level, so have to do `remotecall_eval` here. + remotecall_eval(Main, wids, :(using Oscar)) + + return new(wp, wp.channel, wp.workers, wids, Dict{Int, RemoteChannel}()) + end +end + +@doc raw""" + oscar_worker_pool(n::Int) + oscar_worker_pool(f::Function, n::Int) + +Create an `OscarWorkerPool` with `n` separate processes running Oscar. +There is also the option to use an `OscarWorkerPool` within a context, +such that closing down the processes happens automatically. + +# Example +The following code will start up 3 processes with Oscar, +run a parallel computation over each element in an array and +then shutdown the processes. +``` +results = oscar_worker_pool(3) do wp + Qxy, (x, y) = QQ[:x, :y] + pmap(z -> z^2, wp, [x^2, x*y, x*y^2]) +end +``` +""" +oscar_worker_pool(n::Int) = OscarWorkerPool(n) + +function oscar_worker_pool(f::Function, n::Int) + wp = OscarWorkerPool(n) + local results + try + results = f(wp) + catch e + rethrow(e) + finally + close!(wp) + end + return results +end + +### The following implements the `AbstractWorkerPool` interface + +# Add a new worker to the pool +function push!(wp::OscarWorkerPool, id::Int) + # Make sure the node is running Oscar + remotecall_eval(Main, id, :(using Oscar)) + push!(wp.wids, id) # update the list of associated workers + return push!(wp.wp, a) +end + +# Take a worker from the pool; this marks it as being busy. +function Base.take!(wp::OscarWorkerPool) + return take!(wp.wp) +end + +# Put a formerly busy worker back into the pool. +function Base.put!(wp::OscarWorkerPool, a::Int) + return put!(wp.wp, a) +end + +# Get the number of all workers associated to the pool. +function length(wp::OscarWorkerPool) + return length(wp.wp) +end + +# Return whether any worker is available for work. +function Base.isready(wp::OscarWorkerPool) + return isready(wp.wp) +end + +### end of implementation interface `AbstractWorkerPool` + +### extra functionality + +workers(wp::OscarWorkerPool) = wp.wids + +function get_channel(wp::OscarWorkerPool, id::Int; channel_size::Int=1024) + chnl = get!(wp.oscar_channels, id) do + RemoteChannel(()->Channel{Any}(channel_size), id) + end +end + +close!(wp::OscarWorkerPool) = map(rmprocs, workers(wp)) + +# extend functionality so that `pmap` works with Oscar stuff + +function put_type_params(wp::OscarWorkerPool, a::Any) + for id in wp.wids + put_type_params(get_channel(wp, id), a) + end +end + +### distributed computation with centralized data management + +@doc raw""" + compute_distributed!(ctx, wp::OscarWorkerPool; wait_period=0.1) + +Given an `OscarWorkerPool` `wp` and a context object `ctx`, distribute tasks +coming out of `ctx` to be carried out on the workers (based on availability) and +process the result. + +For this to work, the following methods must be implemented for `ctx`: + - `pop_task!(ctx)` to either return a `Tuple` `(task_id::Int, func::Any, args)`, in which case `remotecall(func, wp, args)` will be called to deploy the task to the workers, or return `nothing` to indicate that all tasks in `ctx` have been exhausted, or return an instance of `WaitForResults` to indicate that some information on results of tasks which have already been given out is needed to proceed with giving out new tasks. + - `process_result!(ctx, task_id::Int, res)` to process the result `res` of the computation of the task with id `task_id`. + +Note: The programmers themselves are responsible to deliver `task_id`s which are unique for the respective tasks! + +Deploying tasks to the workers continues until `pop_task!(ctx)` returns `nothing`. +Computation continues until all deployed tasks have been treated with `process_result!`. +The return value is the `ctx` in its current state after computation. +""" +function compute_distributed!(ctx, wp::OscarWorkerPool; wait_period=0.1) + !isready(wp) && error("pool is not ready") + is_zero(nworkers(wp)) && error("can not do computations on empty pool") + n = nworkers(wp) + futures = Dict{Int, Future}() # task_id => future + tasks_exhausted = false + while true + while isready(wp) + next_task = pop_task!(ctx) + if isnothing(next_task) + tasks_exhausted = true + break + elseif next_task isa WaitForResults + isempty(futures) && error("context object indicates that it is waiting for results, but no `Future`s are present for retrieval") + break + end + task_id, func, args = next_task + futures[task_id] = remotecall(func, wp, args) + end + + # gather results + for (task_id, fut) in futures + isready(fut) || continue + process_result!(ctx, task_id, fetch(fut)) + delete!(futures, task_id) + end + + # if we're done, we're done + tasks_exhausted && isempty(futures) && break + + # prevent spin-locking + sleep(wait_period) + end + return ctx +end + +@doc raw""" + WaitForResults + +A dummy type so that `pop_task!` can return `WaitForResults()` to indicate +that no new task can be deployed before retrieving results of those already +given out. +""" +struct WaitForResults end + +@doc raw""" + pop_task!(ctx::CtxType) where {CtxType} + +Internal function for `compute_distributed!`; to be overwritten with methods for specific `CtxType`s. + +Whatever type of object the user would like to use as context object `ctx` for `compute_distributed` +should implement a method of this function to do the following. + - If there is another task to be deployed to a worker, return a triple `(task_id, func, args)` consisting of a function `func`, a unique `task_id::Int`, and arguments `arg` so that `func(arg)` is called on some worker. + - If no new task can be given out with the current state of the context object `ctx` and we first need to wait for processing of some results of tasks given out already, return `WaitForResults()`. + - If all tasks in `ctx` have been exhausted, return `nothing`. +""" +pop_task!(ctx) = nothing + +@doc raw""" + process_result!(ctx::CtxType, id::Int, res) where {CtxType} + +Internal function for `compute_distributed!`; to be overwritten with methods for specific `CtxType`s. + +For a task `(task_id, func, args)` returned by a call to `pop_task!(ctx)`, the result +of the call `func(args)` is delivered as `res` for processing on the main process. +""" +process_result!(ctx, id::Int, res) = ctx + +#= +# Custom versions of `remotecall` and `remotecall_fetch` for `OscarWorkerPool`s. +# The idea is to use this special type of worker pool to send the `type_params` +# of the arguments to the workers up front. Then hopefully, whenever an +# `OscarWorkerPool` is being used, loads of other functionality like `pmap` +# will run out of the box. +=# +@static if v"1.10" <= VERSION + function remotecall(f::Any, wp::OscarWorkerPool, args...; kwargs...) + wid = take!(wp) + for a in args + put_type_params(get_channel(wp, wid), a) + end + for a in kwargs + put_type_params(get_channel(wp, wid), a) + end + + # Copied from Distributed.jl/src/workerpool.jl. + # This puts the worker back to the pool once the future is ready + # and we do not have to worry about this ourselves. + local fut + try + fut = remotecall(f, wid, args...; kwargs...) + catch + put!(wp, wid) + rethrow() + end + t = Threads.@spawn Threads.threadpool() try + wait(fut) + catch + finally + put!(wp, wid) + end + errormonitor(t) + + return fut + end +else + function remotecall(f::Any, wp::OscarWorkerPool, args...; kwargs...) + wid = take!(wp) + for a in args + put_type_params(get_channel(wp, wid), a) + end + for a in kwargs + put_type_params(get_channel(wp, wid), a) + end + + try + remotecall(f, wid, args...; kwargs...) + finally + put!(wp, wid) + end + end +end + + +function remotecall_fetch(f::Any, wp::OscarWorkerPool, args...; kwargs...) + wid = take!(wp) + for a in args + put_type_params(get_channel(wp, wid), a) + end + for a in kwargs + put_type_params(get_channel(wp, wid), a) + end + result = remotecall_fetch(f, wid, args...; kwargs...) + put!(wp, wid) + return result +end + +export oscar_worker_pool diff --git a/experimental/Parallel/test/OscarWorkerPool.jl b/experimental/Parallel/test/OscarWorkerPool.jl new file mode 100644 index 000000000000..bf8070382c3f --- /dev/null +++ b/experimental/Parallel/test/OscarWorkerPool.jl @@ -0,0 +1,40 @@ +@testset "Parallel" begin + using Distributed + # Start a worker pool with Oscar running on the workers. + # Define an auxiliary context object for the distributed computation + mutable struct CtxIO + a::Vector + b::Dict{Int, Any} + function CtxIO(a::Vector) + return new(a, Dict{Int, Any}()) + end + end + Oscar.pop_task!(ctx::CtxIO) = is_empty(ctx.a) ? nothing : (length(ctx.a), x->x^2, pop!(ctx.a)) + Oscar.process_result!(ctx::CtxIO, id::Int, result) = ctx.b[id] = result + + #initiate worker pool + oscar_worker_pool(1) do wp + @testset "compute distributed" begin + # Implement methods for `pop_task!` and `process_result!` for this + # type as explained in their docstrings. + + # Create a context for the computation. + R, (x, y, z) = QQ[:x, :y, :z] + ctx = CtxIO(gens(R)) + + # Trigger distributed execution. + Oscar.compute_distributed!(ctx, wp) + + # Verify that the result is correct. + @test all(x in keys(ctx.b) for x in [1, 2, 3]) + @test all(a^2 in values(ctx.b) for a in gens(R)) + end + + @testset "pmap" begin + R, (x, y, z) = GF(101)[:x, :y, :z] + @test pmap(f->f^2, wp, gens(R)) == [x^2, y^2, z^2] + end + end +end + + diff --git a/experimental/Parallel/test/runtests.jl b/experimental/Parallel/test/runtests.jl new file mode 100644 index 000000000000..ab11bf0ae54f --- /dev/null +++ b/experimental/Parallel/test/runtests.jl @@ -0,0 +1,2 @@ +include("OscarWorkerPool.jl") + diff --git a/src/imports.jl b/src/imports.jl index 8f59c0873510..0bb53c5fd9c6 100644 --- a/src/imports.jl +++ b/src/imports.jl @@ -4,7 +4,8 @@ using ProgressMeter: @showprogress using Random using RandomExtensions using UUIDs -using Distributed: RemoteChannel, Future, remotecall +using Distributed: RemoteChannel, Future, remotecall, @everywhere, WorkerPool, AbstractWorkerPool, addprocs, rmprocs, remotecall_eval, myid, nworkers +import Distributed: remotecall, workers, remotecall_fetch if VERSION < v"1.11.0-DEV.1562" using Compat: allequal, allunique @@ -50,6 +51,7 @@ import Base: one, parent, print, + put!, reduce, show, sum, diff --git a/test/runtests.jl b/test/runtests.jl index 186532602329..49a2f28e1381 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -78,7 +78,7 @@ testlist = Oscar._gather_tests("test") for exp in Oscar.exppkgs path = joinpath(Oscar.oscardir, "experimental", exp, "test") - if isdir(path) + if isdir(path) && exp != "Parallel" append!(testlist, Oscar._gather_tests(path)) end end @@ -157,6 +157,12 @@ stats = Dict{String,NamedTuple}() if test_subset == :long || test_subset == :default println("Starting tests for Serialization/IPC.jl") push!(stats, Oscar._timed_include("Serialization/IPC.jl", Main)) + + if "Parallel" in Oscar.exppkgs + path = joinpath(Oscar.oscardir, "experimental", "Parallel", "test", "runtests.jl") + println("Starting tests for $path") + push!(stats, Oscar._timed_include(path, Main)) + end end # if many workers, distribute tasks across them