diff --git a/perf_tests/test_2dhalo.cpp b/perf_tests/test_2dhalo.cpp index 8b34381b..a82723a5 100644 --- a/perf_tests/test_2dhalo.cpp +++ b/perf_tests/test_2dhalo.cpp @@ -18,13 +18,14 @@ #include "KokkosComm.hpp" +#include #include -void noop(benchmark::State, MPI_Comm) {} +void noop(benchmark::State, const KokkosComm::Communicator &comm) {} template -void send_recv(benchmark::State &, MPI_Comm comm, const Space &space, int nx, int ny, int rx, int ry, int rs, - const View &v) { +void send_recv(benchmark::State &, const KokkosComm::Communicator &comm, + const Space &space, int nx, int ny, int rx, int ry, int rs, const View &v) { // 2D index of nbrs in minus and plus direction (periodic) const int xm1 = (rx + rs - 1) % rs; const int ym1 = (ry + rs - 1) % rs; @@ -73,10 +74,11 @@ void benchmark_2dhalo(benchmark::State &state) { int ny = 512; int nprops = 3; - int rank, size; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &size); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); + int rank = comm.rank(); + int size = comm.size(); const int rs = std::sqrt(size); const int rx = rank % rs; const int ry = rank / rs; @@ -86,12 +88,11 @@ void benchmark_2dhalo(benchmark::State &state) { // grid of elements, each with 3 properties, and a radius-1 halo grid_type grid("", nx + 2, ny + 2, nprops); while (state.KeepRunning()) { - do_iteration(state, MPI_COMM_WORLD, send_recv, space, nx, ny, rx, ry, - rs, grid); + do_iteration(state, comm, send_recv, space, nx, ny, rx, ry, rs, grid); } } else { while (state.KeepRunning()) { - do_iteration(state, MPI_COMM_WORLD, noop); // do nothing... + do_iteration(state, comm, noop); // do nothing... } } @@ -113,4 +114,4 @@ void benchmark_2dhalo(benchmark::State &state) { // clang-format on } -BENCHMARK(benchmark_2dhalo)->UseManualTime()->Unit(benchmark::kMillisecond); \ No newline at end of file +BENCHMARK(benchmark_2dhalo)->UseManualTime()->Unit(benchmark::kMillisecond); diff --git a/perf_tests/test_sendrecv.cpp b/perf_tests/test_sendrecv.cpp index c8de8e00..cd6ff1a2 100644 --- a/perf_tests/test_sendrecv.cpp +++ b/perf_tests/test_sendrecv.cpp @@ -19,7 +19,8 @@ #include "KokkosComm.hpp" template -void send_recv(benchmark::State &, MPI_Comm comm, const Space &space, int rank, const View &v) { +void send_recv(benchmark::State &, const KokkosComm::Communicator &comm, const Space &space, int rank, + const View &v) { if (0 == rank) { KokkosComm::send(space, v, 1, 0, comm); KokkosComm::recv(space, v, 1, 0, comm); @@ -30,9 +31,11 @@ void send_recv(benchmark::State &, MPI_Comm comm, const Space &space, int rank, } void benchmark_sendrecv(benchmark::State &state) { - int rank, size; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &size); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); + + int rank = comm.rank(); + int size = comm.size(); if (size < 2) { state.SkipWithError("benchmark_sendrecv needs at least 2 ranks"); } @@ -44,10 +47,10 @@ void benchmark_sendrecv(benchmark::State &state) { view_type a("", 1000000); while (state.KeepRunning()) { - do_iteration(state, MPI_COMM_WORLD, send_recv, space, rank, a); + do_iteration(state, comm, send_recv, space, rank, a); } state.SetBytesProcessed(sizeof(Scalar) * state.iterations() * a.size() * 2); } -BENCHMARK(benchmark_sendrecv)->UseManualTime()->Unit(benchmark::kMillisecond); \ No newline at end of file +BENCHMARK(benchmark_sendrecv)->UseManualTime()->Unit(benchmark::kMillisecond); diff --git a/perf_tests/test_utils.hpp b/perf_tests/test_utils.hpp index 00e1061c..3208fea0 100644 --- a/perf_tests/test_utils.hpp +++ b/perf_tests/test_utils.hpp @@ -16,15 +16,18 @@ #pragma once +#include #include #include +#include "KokkosComm_communicator.hpp" #include "KokkosComm_include_mpi.hpp" // F is a function that takes (state, MPI_Comm, args...) template -void do_iteration(benchmark::State &state, MPI_Comm comm, F &&func, Args... args) { +void do_iteration(benchmark::State &state, const KokkosComm::Communicator &comm, + F &&func, Args... args) { using Clock = std::chrono::steady_clock; using Duration = std::chrono::duration; @@ -34,6 +37,6 @@ void do_iteration(benchmark::State &state, MPI_Comm comm, F &&func, Args... args double max_elapsed_second; double elapsed_seconds = elapsed.count(); - MPI_Allreduce(&elapsed_seconds, &max_elapsed_second, 1, MPI_DOUBLE, MPI_MAX, comm); + MPI_Allreduce(&elapsed_seconds, &max_elapsed_second, 1, MPI_DOUBLE, MPI_MAX, comm.as_raw()); state.SetIterationTime(max_elapsed_second); -} \ No newline at end of file +} diff --git a/src/KokkosComm.hpp b/src/KokkosComm.hpp index 84ab90ea..a4e24d75 100644 --- a/src/KokkosComm.hpp +++ b/src/KokkosComm.hpp @@ -16,31 +16,34 @@ #pragma once +#include #include "KokkosComm_collective.hpp" +#include "KokkosComm_communicator.hpp" #include "KokkosComm_version.hpp" #include "KokkosComm_isend.hpp" #include "KokkosComm_recv.hpp" #include "KokkosComm_send.hpp" #include "KokkosComm_concepts.hpp" #include "KokkosComm_comm_mode.hpp" +#include "KokkosComm_MPI_instance.hpp" #include namespace KokkosComm { template -Req isend(const ExecSpace &space, const SendView &sv, int dest, int tag, MPI_Comm comm) { - return Impl::isend(space, sv, dest, tag, comm); +Req isend(const ExecSpace &space, const SendView &sv, int dest, int tag, const Communicator &comm) { + return Impl::isend(space, sv, dest, tag, comm.as_raw()); } template -void send(const ExecSpace &space, const SendView &sv, int dest, int tag, MPI_Comm comm) { - return Impl::send(space, sv, dest, tag, comm); +void send(const ExecSpace &space, const SendView &sv, int dest, int tag, const Communicator &comm) { + return Impl::send(space, sv, dest, tag, comm.as_raw()); } template -void recv(const ExecSpace &space, RecvView &sv, int src, int tag, MPI_Comm comm) { - return Impl::recv(space, sv, src, tag, comm); +void recv(const ExecSpace &space, RecvView &sv, int src, int tag, const Communicator &comm) { + return Impl::recv(space, sv, src, tag, comm.as_raw()); } } // namespace KokkosComm diff --git a/src/KokkosComm_comm_mode.hpp b/src/KokkosComm_comm_mode.hpp index 51dee1ae..52a21c5a 100644 --- a/src/KokkosComm_comm_mode.hpp +++ b/src/KokkosComm_comm_mode.hpp @@ -21,22 +21,18 @@ namespace KokkosComm { // Scoped enumeration to specify the communication mode of a sending operation. // See section 3.4 of the MPI standard for a complete specification. enum class CommMode { - // Default mode: lets the user override the send operations behavior at - // compile-time. E.g., this can be set to mode "Synchronous" for debug - // builds by defining KOKKOSCOMM_FORCE_SYNCHRONOUS_MODE. + // Default mode: lets the user override the send operations behavior at compile-time. E.g. this can be set to mode + // "Synchronous" for debug builds by defining KOKKOSCOMM_FORCE_SYNCHRONOUS_MODE. Default, - // Standard mode: MPI implementation decides whether outgoing messages will - // be buffered. Send operations can be started whether or not a matching - // receive has been started. They may complete before a matching receive is - // started. Standard mode is non-local: successful completion of the send - // operation may depend on the occurrence of a matching receive. + // Standard mode: MPI implementation decides whether outgoing messages will be buffered. Send operations can be + // started whether or not a matching receive has been started. They may complete before a matching receive is started. + // Standard mode is non-local: successful completion of the send operation may depend on the occurrence of a matching + // receive. Standard, - // Ready mode: Send operations may be started only if the matching receive is - // already started. + // Ready mode: Send operations may be started only if the matching receive is already started. Ready, - // Synchronous mode: Send operations complete successfully only if a matching - // receive is started, and the receive operation has started to receive the - // message sent. + // Synchronous mode: Send operations complete successfully only if a matching receive is started, and the receive + // operation has started to receive the message sent. Synchronous, }; diff --git a/src/KokkosComm_communicator.hpp b/src/KokkosComm_communicator.hpp new file mode 100644 index 00000000..c3ecd7c1 --- /dev/null +++ b/src/KokkosComm_communicator.hpp @@ -0,0 +1,81 @@ +//@HEADER +// ************************************************************************ +// +// Kokkos v. 4.0 +// Copyright (2022) National Technology & Engineering +// Solutions of Sandia, LLC (NTESS). +// +// Under the terms of Contract DE-NA0003525 with NTESS, +// the U.S. Government retains certain rights in this software. +// +// Part of Kokkos, under the Apache License v2.0 with LLVM Exceptions. +// See https://kokkos.org/LICENSE for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//@HEADER + +#pragma once + +#include "KokkosComm_concepts.hpp" + +#include +#include +#include + +namespace KokkosComm { + +using Color = int; +using Key = int; + +template +class Communicator { + private: + MPI_Comm _comm; + ExecSpace _exec_space; + + public: + Communicator(MPI_Comm comm) : _comm(comm) {} + Communicator(const Communicator& other) = delete; + Communicator(const Communicator&& other) { _comm = std::move(other._comm); } + ~Communicator() { + // Only free the communicator if it hasn't been set to `MPI_COMM_NULL` before. This is to prevent double freeing + // when we explicitly call the communicator's dtor in the `Context` dtor. + if (MPI_COMM_NULL != _comm) { + MPI_Comm_free(&_comm); + } + } + + static auto dup_raw(MPI_Comm raw) -> Communicator { + MPI_Comm new_comm; + MPI_Comm_dup(raw, &new_comm); + return Communicator(new_comm); + } + + static auto dup(const Communicator& other) -> Communicator { return Communicator::dup_raw(other.as_raw()); } + + static auto split_raw(MPI_Comm raw, Color color, Key key) -> Communicator { + MPI_Comm new_comm; + MPI_Comm_split(raw, color, key, &new_comm); + return Communicator(new_comm); + } + + static auto split(const Communicator& other, Color color, Key key) -> Communicator { + return Communicator::split_raw(other.as_raw(), color, key); + } + + inline auto as_raw() const -> MPI_Comm { return _comm; } + + inline auto size(void) const -> int { + int size; + MPI_Comm_size(_comm, &size); + return size; + } + + inline auto rank(void) const -> int { + int rank; + MPI_Comm_rank(_comm, &rank); + return rank; + } +}; + +} // namespace KokkosComm diff --git a/src/impl/KokkosComm_MPI_instance.hpp b/src/impl/KokkosComm_MPI_instance.hpp new file mode 100644 index 00000000..df86776a --- /dev/null +++ b/src/impl/KokkosComm_MPI_instance.hpp @@ -0,0 +1,79 @@ +//@HEADER +// ************************************************************************ +// +// Kokkos v. 4.0 +// Copyright (2022) National Technology & Engineering +// Solutions of Sandia, LLC (NTESS). +// +// Under the terms of Contract DE-NA0003525 with NTESS, +// the U.S. Government retains certain rights in this software. +// +// Part of Kokkos, under the Apache License v2.0 with LLVM Exceptions. +// See https://kokkos.org/LICENSE for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//@HEADER + +#pragma once + +#include "KokkosComm_communicator.hpp" +#include "KokkosComm_concepts.hpp" + +#include +#include + +namespace KokkosComm { + +template +class Context { + private: + MPI_Session _shandle; + Communicator _comm; + + public: + Context(MPI_Session shandle, MPI_Comm comm) : _shandle(shandle), _comm(Communicator(comm)) {} + + ~Context() { + // Ensure the session-associated communicator is destroyed before the session is finalized. + _comm.~Communicator(); + MPI_Session_finalize(&_shandle); + } + + auto comm(void) -> const Communicator& { return _comm; } +}; + +template +auto initialize(void) -> Context { + int rc; + + MPI_Session kokkoscomm_shandle = MPI_SESSION_NULL; + MPI_Group kokkoscomm_group = MPI_GROUP_NULL; + MPI_Comm kokkoscomm_comm = MPI_COMM_NULL; + MPI_Info kokkoscomm_info = MPI_INFO_NULL; + + MPI_Info_create(&kokkoscomm_info); + + // Set threading level for our session + constexpr char thrd_lvl_key[] = "thread_level"; + constexpr char thrd_lvl_val[] = "MPI_THREAD_MULTIPLE"; + MPI_Info_set(kokkoscomm_info, thrd_lvl_key, thrd_lvl_val); + +#ifdef KOKKOSCOMM_CUDA_AWARE_MPI + // Disable CUDA pointer attribute checks from MPI + constexpr char cu_ptr_attr_key[] = "mpi_communication_pattern"; + constexpr char cu_ptr_attr_val[] = "MPI_CPU_TO_GPU"; + MPI_Info_set(kokkoscomm_info, cu_ptr_attr_key, cu_ptr_attr_val); +#endif + + rc = MPI_Session_init(kokkoscomm_info, MPI_ERRORS_RETURN, &kokkoscomm_shandle); + + constexpr char pset_name[] = "mpi://WORLD"; + MPI_Group_from_session_pset(kokkoscomm_shandle, pset_name, &kokkoscomm_group); + + MPI_Comm_create_from_group(kokkoscomm_group, "kokkos-comm.default_session", MPI_INFO_NULL, MPI_ERRORS_RETURN, + &kokkoscomm_comm); + + return Context(kokkoscomm_shandle, kokkoscomm_comm); +} + +} // namespace KokkosComm diff --git a/src/impl/KokkosComm_isend.hpp b/src/impl/KokkosComm_isend.hpp index 1e2a8ba4..aaf34450 100644 --- a/src/impl/KokkosComm_isend.hpp +++ b/src/impl/KokkosComm_isend.hpp @@ -25,6 +25,7 @@ #include "KokkosComm_request.hpp" #include "KokkosComm_traits.hpp" #include "KokkosComm_comm_mode.hpp" +#include "KokkosComm_communicator.hpp" // impl #include "KokkosComm_include_mpi.hpp" diff --git a/unit_tests/test_isendirecv.cpp b/unit_tests/test_isendirecv.cpp index 1a351e85..ad58a766 100644 --- a/unit_tests/test_isendirecv.cpp +++ b/unit_tests/test_isendirecv.cpp @@ -38,9 +38,11 @@ void test_1d(const View1D &a) { static_assert(View1D::rank == 1, ""); using Scalar = typename View1D::non_const_value_type; - int rank, size; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &size); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); + + int rank = comm.rank(); + int size = comm.size(); if (size < 2) { GTEST_SKIP() << "Requires >= 2 ranks (" << size << " provided)"; } @@ -49,12 +51,12 @@ void test_1d(const View1D &a) { int dst = 1; Kokkos::parallel_for( a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); - KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, MPI_COMM_WORLD); + KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); req.wait(); } else if (1 == rank) { int src = 0; MPI_Request req; - KokkosComm::Impl::irecv(a, src, 0, MPI_COMM_WORLD, req); + KokkosComm::Impl::irecv(a, src, 0, comm.as_raw(), req); MPI_Wait(&req, MPI_STATUS_IGNORE); int errs; Kokkos::parallel_reduce( @@ -68,9 +70,11 @@ void test_2d(const View2D &a) { static_assert(View2D::rank == 2, ""); using Scalar = typename View2D::non_const_value_type; - int rank, size; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &size); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); + + int rank = comm.rank(); + int size = comm.size(); if (size < 2) { GTEST_SKIP() << "Requires >= 2 ranks (" << size << " provided)"; } @@ -82,12 +86,12 @@ void test_2d(const View2D &a) { int dst = 1; Kokkos::parallel_for( policy, KOKKOS_LAMBDA(int i, int j) { a(i, j) = i * a.extent(0) + j; }); - KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, MPI_COMM_WORLD); + KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); req.wait(); } else if (1 == rank) { int src = 0; MPI_Request req; - KokkosComm::Impl::irecv(a, src, 0, MPI_COMM_WORLD, req); + KokkosComm::Impl::irecv(a, src, 0, comm.as_raw(), req); MPI_Wait(&req, MPI_STATUS_IGNORE); int errs; Kokkos::parallel_reduce( @@ -106,4 +110,4 @@ TYPED_TEST(IsendIrecv, 2D_contig) { test_2d(a); } -} // namespace \ No newline at end of file +} // namespace diff --git a/unit_tests/test_isendrecv.cpp b/unit_tests/test_isendrecv.cpp index 68bd3bf6..d2b0591c 100644 --- a/unit_tests/test_isendrecv.cpp +++ b/unit_tests/test_isendrecv.cpp @@ -38,9 +38,11 @@ void isend_comm_mode_1d_contig() { Kokkos::View a("a", 1000); - int rank, size; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &size); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); + + int rank = comm.rank(); + int size = comm.size(); if (size < 2) { GTEST_SKIP() << "Requires >= 2 ranks (" << size << " provided)"; } @@ -49,11 +51,11 @@ void isend_comm_mode_1d_contig() { int dst = 1; Kokkos::parallel_for( a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); - KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, MPI_COMM_WORLD); + KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); req.wait(); } else if (1 == rank) { int src = 0; - KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, MPI_COMM_WORLD); + KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, comm); int errs; Kokkos::parallel_reduce( a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != Scalar(i); }, errs); @@ -71,18 +73,19 @@ void isend_comm_mode_1d_noncontig() { Kokkos::View b("a", 10, 10); auto a = Kokkos::subview(b, Kokkos::ALL, 2); // take column 2 (non-contiguous) - int rank; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); + int rank = comm.rank(); if (0 == rank) { int dst = 1; Kokkos::parallel_for( a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); - KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, MPI_COMM_WORLD); + KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); req.wait(); } else if (1 == rank) { int src = 0; - KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, MPI_COMM_WORLD); + KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, comm); int errs; Kokkos::parallel_reduce( a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != Scalar(i); }, errs); diff --git a/unit_tests/test_sendrecv.cpp b/unit_tests/test_sendrecv.cpp index 6c4c4ced..74696482 100644 --- a/unit_tests/test_sendrecv.cpp +++ b/unit_tests/test_sendrecv.cpp @@ -15,6 +15,7 @@ //@HEADER #include +#include #include "KokkosComm.hpp" @@ -37,9 +38,11 @@ void send_comm_mode_1d_contig() { Kokkos::View a("a", 1000); - int rank, size; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &size); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); + + int rank = comm.rank(); + int size = comm.size(); if (size < 2) { GTEST_SKIP() << "Requires >= 2 ranks (" << size << " provided)"; } @@ -48,10 +51,10 @@ void send_comm_mode_1d_contig() { int dst = 1; Kokkos::parallel_for( a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); - KokkosComm::send(Kokkos::DefaultExecutionSpace(), a, dst, 0, MPI_COMM_WORLD); + KokkosComm::send(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); } else if (1 == rank) { int src = 0; - KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, MPI_COMM_WORLD); + KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, comm); int errs; Kokkos::parallel_reduce( a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != i; }, errs); @@ -69,17 +72,18 @@ void send_comm_mode_1d_noncontig() { Kokkos::View b("b", 10, 10); auto a = Kokkos::subview(b, Kokkos::ALL, 2); // take column 2 (non-contiguous) - int rank; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); + int rank = comm.rank(); if (0 == rank) { int dst = 1; Kokkos::parallel_for( a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); - KokkosComm::send(Kokkos::DefaultExecutionSpace(), a, dst, 0, MPI_COMM_WORLD); + KokkosComm::send(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); } else if (1 == rank) { int src = 0; - KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, MPI_COMM_WORLD); + KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, comm); int errs; Kokkos::parallel_reduce( a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != i; }, errs);