Skip to content

SolveInParallel with Generators #22225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions TODO
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bind required_capabilities for mathematical program (see branch conic_standard)
Bind ProgramType for mathematical program (see branch conic_standard)
Bind __eq__, __hash__, __copy__ for constraint in evaluators (see branch conic standard)
Bind prog.AddConstraint(constraint). Currently this requires prog.AddConstraint(constraint.evaluator(), constraint.variables())

84 changes: 55 additions & 29 deletions bindings/pydrake/solvers/solvers_py_mathematicalprogram.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1637,33 +1637,49 @@ void BindFreeFunctions(py::module m) {
// Inside of the lambda we'll demote nullopts back to nullptrs. Note
// that SolverOptions is not necessarily cheap to copy, so we still
// carefully accept it by-pointer. The VectorXd is always necessarily
// copied when going form numpy to Eigen so we still pass it by-value.
// copied when going from numpy to Eigen so we still pass it by-value.
[](std::vector<const MathematicalProgram*> progs,
std::optional<std::vector<std::optional<Eigen::VectorXd>>>
initial_guesses,
std::optional<std::vector<std::optional<SolverOptions*>>>
solver_options,
std::optional<std::vector<std::optional<SolverId>>> solver_ids,
const Parallelism& parallelism, bool dynamic_schedule) {
std::vector<const Eigen::VectorXd*> initial_guesses_ptrs;
if (initial_guesses.has_value()) {
initial_guesses_ptrs.reserve(initial_guesses->size());
for (const auto& guess : *initial_guesses) {
initial_guesses_ptrs.push_back(guess ? &(*guess) : nullptr);
const std::function<const MathematicalProgram*(int64_t, int64_t)>
progs_generator =
[&progs](int64_t, int64_t i) -> const MathematicalProgram* {
return progs.at(i);
};
auto initial_guess_generator =
[&initial_guesses](
int64_t, int64_t i) -> std::optional<Eigen::VectorXd> {
if (initial_guesses.has_value()) {
return initial_guesses->at(i);
} else {
return std::nullopt;
}
}
std::vector<const SolverOptions*> solver_options_ptrs;
if (solver_options.has_value()) {
solver_options_ptrs.reserve(solver_options->size());
for (const auto& option : *solver_options) {
solver_options_ptrs.push_back(option ? *option : nullptr);
};
auto solver_options_generator =
[&solver_options](
int64_t, int64_t i) -> std::optional<SolverOptions> {
if (solver_options.has_value() &&
*(solver_options->at(i)) != nullptr) {
return **(solver_options->at(i));
} else {
return std::nullopt;
}
}
return solvers::SolveInParallel(progs,
initial_guesses.has_value() ? &initial_guesses_ptrs : nullptr,
solver_options.has_value() ? &solver_options_ptrs : nullptr,
solver_ids.has_value() ? &(*solver_ids) : nullptr, parallelism,
dynamic_schedule);
};
auto solver_ids_generator =
[&solver_ids](int64_t, int64_t i) -> std::optional<SolverId> {
if (solver_ids.has_value()) {
return solver_ids->at(i);
} else {
return std::nullopt;
}
};
return solvers::SolveInParallel(progs_generator, 0, ssize(progs),
initial_guess_generator, solver_options_generator,
solver_ids_generator, parallelism, dynamic_schedule);
},
py::arg("progs"), py::arg("initial_guesses") = std::nullopt,
py::arg("solver_options") = std::nullopt,
Expand All @@ -1672,7 +1688,7 @@ void BindFreeFunctions(py::module m) {
py::arg("dynamic_schedule") = false,
py::call_guard<py::gil_scoped_release>(),
doc.SolveInParallel
.doc_6args_progs_initial_guesses_solver_options_solver_ids_parallelism_dynamic_schedule)
.doc_6args_conststdvector_conststdvector_conststdvector_conststdvector_drakeParallelism_bool)
.def(
"SolveInParallel",
[](std::vector<const MathematicalProgram*> progs,
Expand All @@ -1681,16 +1697,23 @@ void BindFreeFunctions(py::module m) {
const SolverOptions* solver_options,
const std::optional<SolverId>& solver_id,
const Parallelism& parallelism, bool dynamic_schedule) {
std::vector<const Eigen::VectorXd*> initial_guesses_ptrs;
if (initial_guesses.has_value()) {
initial_guesses_ptrs.reserve(initial_guesses->size());
for (const auto& guess : *initial_guesses) {
initial_guesses_ptrs.push_back(guess ? &(*guess) : nullptr);
const std::function<const MathematicalProgram*(int64_t, int64_t)>
progs_generator =
[&progs](int64_t, int64_t i) -> const MathematicalProgram* {
return progs.at(i);
};
auto initial_guess_generator =
[&initial_guesses](
int64_t, int64_t i) -> std::optional<Eigen::VectorXd> {
if (initial_guesses.has_value()) {
return initial_guesses->at(i);
} else {
return std::nullopt;
}
}
return solvers::SolveInParallel(progs,
initial_guesses.has_value() ? &initial_guesses_ptrs : nullptr,
solver_options, solver_id, parallelism, dynamic_schedule);
};
return solvers::SolveInParallel(progs_generator, 0, ssize(progs),
initial_guess_generator, solver_options, solver_id, parallelism,
dynamic_schedule);
},
py::arg("progs"), py::arg("initial_guesses") = std::nullopt,
py::arg("solver_options") = std::nullopt,
Expand All @@ -1699,7 +1722,10 @@ void BindFreeFunctions(py::module m) {
py::arg("dynamic_schedule") = false,
py::call_guard<py::gil_scoped_release>(),
doc.SolveInParallel
.doc_6args_progs_initial_guesses_solver_options_solver_id_parallelism_dynamic_schedule);
.doc_6args_conststdvector_conststdvector_constdrakesolversSolverOptions_conststdoptional_drakeParallelism_bool);
// N.B. Do NOT bind the generator forms of SolveInParallel directly. Callbacks
// to Python state from parallel C++ is difficult and likely to lead to
// deadlocks.
}

} // namespace
Expand Down
80 changes: 42 additions & 38 deletions geometry/optimization/convex_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,61 +122,65 @@ bool IsBoundedParallel(const ConvexSet& s, Parallelism parallelism) {
ConstructEmptyBoundednessProgram(&(progs[i]), s);
}

// Pre-allocate empty MathematicalProgramResults for each thread.
std::vector<MathematicalProgramResult> results(parallelism.num_threads());

// Pre-allocate solver instances.
const solvers::SolverId solver_id = solvers::ChooseBestSolver(progs[0]);
std::vector<std::unique_ptr<solvers::SolverInterface>> solver_interfaces(
parallelism.num_threads());

// Pre-allocate the solver options.
std::vector<solvers::SolverOptions> options(parallelism.num_threads());

for (int i = 0; i < parallelism.num_threads(); ++i) {
solver_interfaces[i] = solvers::MakeSolver(solver_id);
options[i].SetOption(solvers::CommonSolverOption::kMaxThreads, 1);
}

std::atomic<bool> certified_unbounded = false;
auto index_to_dimension = [&](const int64_t i) -> int {
return i / 2;
};

// This worker lambda maps the index i to a dimension and direction to check
// boundedness. If unboundedness has already been verified, it just exits
// early. If unboundedness is verified in this iteration, certified_unbounded
// will be updated to reflect that fact. For a given index i, the dimension is
// int(i / 2), and if i % 2 == 0, then we maximize, otherwise, we minimize.
auto solve_ith = [&](const int thread_num, const int64_t i) {
// boundedness. If unboundedness has already been verified, it returns
// nullptr. If unboundedness is verified in this iteration,
// certified_unbounded will be updated to reflect that fact. For a given index
// i, the dimension is int(i / 2), and if i % 2 == 0, then we maximize,
// otherwise, we minimize.
auto make_ith = [&](int64_t thread_num, int64_t i) -> MathematicalProgram* {
if (certified_unbounded.load()) {
return;
// Exit early if unboundedness has already been verified.
return nullptr;
}

const int dimension = i / 2;
const int dimension = index_to_dimension(i);
bool maximize = i % 2 == 0;

// Update the objective vector. By construction, each MathematicalProgram
// has one cost (the linear cost).
progs[thread_num].linear_costs()[0].evaluator()->update_coefficient_entry(
dimension, maximize ? -1 : 1);
DRAKE_ASSERT(progs[thread_num].IsThreadSafe());
solver_interfaces[thread_num]->Solve(progs[thread_num], std::nullopt,
options[thread_num],
&(results[thread_num]));
if (ProgramResultImpliesUnbounded(results[thread_num])) {
certified_unbounded.store(true);
}

// Reset the objective vector.
progs[thread_num].linear_costs()[0].evaluator()->update_coefficient_entry(
dimension, 0);
return &(progs[thread_num]);
};

std::function<void(MathematicalProgram**, const MathematicalProgramResult&,
int64_t, int64_t)>
teardown_ith = [&](MathematicalProgram**,
const MathematicalProgramResult& result_i,
int64_t thread_num, int64_t i) {
const int dimension = index_to_dimension(i);
progs[thread_num]
.linear_costs()[0]
.evaluator()
->update_coefficient_entry(dimension, 0);
if (ProgramResultImpliesUnbounded(result_i)) {
certified_unbounded.store(true);
}
};

// We run the loop from 0 to 2 * s.ambient_dimension(), since two programs are
// solved for each dimension (maximizing and minimizing). All programs are the
// same size, so static scheduling is appropriate.
StaticParallelForIndexLoop(DegreeOfParallelism(parallelism.num_threads()), 0,
2 * s.ambient_dimension(), solve_ith,
ParallelForBackend::BEST_AVAILABLE);

std::vector<MathematicalProgramResult> results =
SolveInParallel<MathematicalProgram*>(
make_ith, // prog_generator
0, // range_start
2 * s.ambient_dimension(), // range_end
[](int64_t, int64_t) -> std::optional<Eigen::VectorXd> {
return std::nullopt;
}, // initial_guesses_generator
nullptr, // solver_options
std::nullopt, // solver_id
Parallelism::Max(), // parallelism
false, // dynamic_schedule
&teardown_ith // prog_teardown
); // NOLINT
return !certified_unbounded.load();
}
} // namespace
Expand Down
23 changes: 10 additions & 13 deletions geometry/optimization/geodesic_convexity.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,26 +460,23 @@ ComputePairwiseIntersections(const ConvexSets& convex_sets_A,
Eigen::MatrixXd Aeq(dimension, 2 * dimension); // Aeq = [-I, I]
Aeq.leftCols(dimension) = -Eigen::MatrixXd::Identity(dimension, dimension);
Aeq.rightCols(dimension) = Eigen::MatrixXd::Identity(dimension, dimension);
std::vector<MathematicalProgram> progs(n_candidates);
std::vector<std::unique_ptr<MathematicalProgram>> progs;
progs.reserve(n_candidates);
for (int i = 0; i < n_candidates; ++i) {
VectorXDecisionVariable x = progs[i].NewContinuousVariables(dimension);
VectorXDecisionVariable y = progs[i].NewContinuousVariables(dimension);
progs.emplace_back(std::make_unique<MathematicalProgram>());
VectorXDecisionVariable x = progs[i]->NewContinuousVariables(dimension);
VectorXDecisionVariable y = progs[i]->NewContinuousVariables(dimension);
// Add x + offset == y by [-I, I][x; y] == [offset]
progs[i].AddLinearEqualityConstraint(Aeq, candidate_edge_offsets[i],
{x, y});
progs[i]->AddLinearEqualityConstraint(Aeq, candidate_edge_offsets[i],
{x, y});
convex_sets_A.at(candidate_edges[i].first)
->AddPointInSetConstraints(&(progs[i]), x);
->AddPointInSetConstraints(progs[i].get(), x);
convex_sets_B.at(candidate_edges[i].second)
->AddPointInSetConstraints(&(progs[i]), y);
->AddPointInSetConstraints(progs[i].get(), y);
}

std::vector<const MathematicalProgram*> prog_ptrs;
prog_ptrs.reserve(n_candidates);
for (int i = 0; i < n_candidates; ++i) {
prog_ptrs.push_back(&(progs[i]));
}
std::vector<MathematicalProgramResult> results = SolveInParallel(
prog_ptrs, nullptr /* initial_guesses */, nullptr /* solver_options */,
progs, nullptr /* initial_guesses */, nullptr /* solver_options */,
std::nullopt /* solver_id */, parallelism, false /* dynamic_schedule */);

std::vector<std::pair<int, int>> edges;
Expand Down
12 changes: 2 additions & 10 deletions geometry/optimization/graph_of_convex_sets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -826,13 +826,9 @@ std::set<EdgeId> GraphOfConvexSets::PreprocessShortestPath(
// remainder of the function.
int nE = edge_id_list.size();

// TODO(cohnt): Rewrite with a parallel for loop where each thread creates and
// solves the preprocessing program, to avoid having to use batching together
// with SolveInParallel.

// TODO(cohnt): Rewrite using generators to avoid batching.
// Given an edge (u,v) check if a path from source to u and another from v to
// target exist without sharing edges.

int preprocessing_parallel_batch_size = 1000;
int num_batches = 1 + (nE / preprocessing_parallel_batch_size);
for (int batch_idx = 0; batch_idx < num_batches; ++batch_idx) {
Expand All @@ -858,10 +854,6 @@ std::set<EdgeId> GraphOfConvexSets::PreprocessShortestPath(
}

// Check if edge e = (u,v) could be on a path from start to goal.
std::vector<const MathematicalProgram*> prog_ptrs(progs.size());
for (int i = 0; i < ssize(progs); ++i) {
prog_ptrs[i] = progs[i].get();
}
std::optional<solvers::SolverId> maybe_solver_id;
solvers::SolverOptions preprocessing_solver_options =
options.preprocessing_solver_options.value_or(options.solver_options);
Expand All @@ -874,7 +866,7 @@ std::set<EdgeId> GraphOfConvexSets::PreprocessShortestPath(
}

std::vector<MathematicalProgramResult> results =
SolveInParallel(prog_ptrs, nullptr, &preprocessing_solver_options,
SolveInParallel(progs, nullptr, &preprocessing_solver_options,
maybe_solver_id, options.parallelism, false);

for (int i = 0; i < this_batch_nE; ++i) {
Expand Down
Loading