diff --git a/docs/source/details/backendconfig.rst b/docs/source/details/backendconfig.rst index 123b0a58e0..30b5e4f779 100644 --- a/docs/source/details/backendconfig.rst +++ b/docs/source/details/backendconfig.rst @@ -190,6 +190,17 @@ Explanation of the single keys: * ``type`` supported ADIOS operator type, e.g. zfp, sz * ``parameters`` is an associative map of string parameters for the operator (e.g. compression levels) +* ``adios2.dataset.shape`` (advanced): Specify the `dataset shape `_ for the ADIOS2 variable. + Note that variable shapes will generally imply a different way of interacting with a variable, and some variable shapes (such as *joined arrays*) may not be accessible via this parameter, but via different API calls instead. + This parameter's purpose to select different implementations for the same used API call. + Supported values by this parameter are: + + * ``"global_array"`` (default): The variable is a (n-dimensional) array with a globally defined size. Local blocks, subsets of the global region, are written by parallel writers. + * ``"local_value"``: Each parallel writer contributes one single value to the dataset, joined into a 1-dimensional array. + Since there (currently) exists no dedicated API call for this shape in the openPMD-api, this setting is only useful as an optimization since "local value" variables participate in ADIOS2 metadata aggregation. + Can only be applied if the global shape (1-dimensional array the same length as number of parallel instances) and the local blocks (a single data item) are specified correctly. + Use global or joined arrays otherwise. + * ``adios2.use_span_based_put``: The openPMD-api exposes the `span-based Put() API `_ of ADIOS2 via an overload of ``RecordComponent::storeChunk()``. This API is incompatible with compression operators as described above. The openPMD-api will automatically use a fallback implementation for the span-based Put() API if any operator is added to a dataset. diff --git a/examples/10_streaming_read.cpp b/examples/10_streaming_read.cpp index eb02f3b393..90d0d6fe44 100644 --- a/examples/10_streaming_read.cpp +++ b/examples/10_streaming_read.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -55,6 +56,25 @@ int main() extents[i] = rc.getExtent(); } + auto e_patches = iteration.particles["e"].particlePatches; + for (auto key : + {"numParticles", "numParticlesOffset", "offset", "extent"}) + { + for (auto &rc : e_patches[key]) + { + std::cout << "Chunks for '" << rc.second.myPath().openPMDPath() + << "':"; + for (auto const &chunk : rc.second.availableChunks()) + { + std::cout << "\n\tRank " << chunk.sourceID << "\t" + << auxiliary::vec_as_string(chunk.offset) + << "\t– " + << auxiliary::vec_as_string(chunk.extent); + } + std::cout << std::endl; + } + } + // The iteration can be closed in order to help free up resources. // The iteration's content will be flushed automatically. // An iteration once closed cannot (yet) be reopened. diff --git a/examples/10_streaming_write.cpp b/examples/10_streaming_write.cpp index d64bee6d79..a74fb88d79 100644 --- a/examples/10_streaming_write.cpp +++ b/examples/10_streaming_write.cpp @@ -1,5 +1,3 @@ -#include "openPMD/Series.hpp" -#include "openPMD/snapshots/Snapshots.hpp" #include #include @@ -7,6 +5,10 @@ #include #include // std::iota +#if openPMD_HAVE_MPI +#include +#endif + using std::cout; using namespace openPMD; @@ -21,8 +23,22 @@ int main() return 0; } + int mpi_rank{0}, mpi_size{1}; + +#if openPMD_HAVE_MPI + MPI_Init(nullptr, nullptr); + MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); + MPI_Comm_size(MPI_COMM_WORLD, &mpi_size); +#endif + // open file for writing - Series series = Series("electrons.sst", Access::CREATE, R"( + Series series = Series( + "electrons.sst", + Access::CREATE, +#if openPMD_HAVE_MPI + MPI_COMM_WORLD, +#endif + R"( { "adios2": { "engine": { @@ -31,11 +47,13 @@ int main() } } } -})"); +})" + + ); Datatype datatype = determineDatatype(); constexpr unsigned long length = 10ul; - Extent global_extent = {length}; + Extent global_extent = {mpi_size * length}; Dataset dataset = Dataset(datatype, global_extent); std::shared_ptr local_data( new position_t[length], [](position_t const *ptr) { delete[] ptr; }); @@ -51,13 +69,67 @@ int main() Iteration iteration = iterations[i]; Record electronPositions = iteration.particles["e"]["position"]; - std::iota(local_data.get(), local_data.get() + length, i * length); + std::iota( + local_data.get(), + local_data.get() + length, + i * length * mpi_size + mpi_rank * length); for (auto const &dim : {"x", "y", "z"}) { RecordComponent pos = electronPositions[dim]; pos.resetDataset(dataset); - pos.storeChunk(local_data, Offset{0}, global_extent); + pos.storeChunk(local_data, Offset{length * mpi_rank}, {length}); + } + + // Use the `local_value` ADIOS2 dataset shape to send a dataset not via + // the data plane, but the control plane of ADIOS2 SST. This is + // advisable for datasets where each rank contributes only a single item + // since the control plane performs data aggregation, thus avoiding + // fully interconnected communication meshes for data that needs to be + // read by each reader. A local value dataset can only contain a single + // item per MPI rank, forming an array of length equal to the MPI size. + // https://adios2.readthedocs.io/en/v2.9.2/components/components.html#shapes + + auto e_patches = iteration.particles["e"].particlePatches; + auto numParticles = e_patches["numParticles"]; + auto numParticlesOffset = e_patches["numParticlesOffset"]; + for (auto rc : {&numParticles, &numParticlesOffset}) + { + rc->resetDataset( + {Datatype::ULONG, + {Extent::value_type(mpi_size)}, + R"(adios2.dataset.shape = "local_value")"}); + } + numParticles.storeChunk( + std::make_unique(10), {size_t(mpi_rank)}, {1}); + numParticlesOffset.storeChunk( + std::make_unique(10 * ((unsigned long)mpi_rank)), + {size_t(mpi_rank)}, + {1}); + auto offset = e_patches["offset"]; + for (auto const &dim : {"x", "y", "z"}) + { + auto rc = offset[dim]; + rc.resetDataset( + {Datatype::ULONG, + {Extent::value_type(mpi_size)}, + R"(adios2.dataset.shape = "local_value")"}); + rc.storeChunk( + std::make_unique((unsigned long)mpi_rank), + {size_t(mpi_rank)}, + {1}); + } + auto extent = e_patches["extent"]; + for (auto const &dim : {"x", "y", "z"}) + { + auto rc = extent[dim]; + rc.resetDataset( + {Datatype::ULONG, + {Extent::value_type(mpi_size)}, + R"(adios2.dataset.shape = "local_value")"}); + rc.storeChunk( + std::make_unique(1), {size_t(mpi_rank)}, {1}); } + iteration.close(); } @@ -69,6 +141,10 @@ int main() */ series.close(); +#if openPMD_HAVE_MPI + MPI_Finalize(); +#endif + return 0; #else std::cout << "The streaming example requires that openPMD has been built " diff --git a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp index f2280433fd..7f25eb0f85 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp @@ -38,7 +38,6 @@ #include "openPMD/auxiliary/StringManip.hpp" #include "openPMD/backend/Writable.hpp" #include "openPMD/config.hpp" -#include #if openPMD_HAVE_ADIOS2 #include @@ -304,9 +303,12 @@ class ADIOS2IOHandlerImpl adios2::Params params; }; - std::vector defaultOperators; + // read operators can (currently) not be specified per dataset, so parse + // them once and then buffer them + std::vector readOperators; json::TracingJSON m_config; + std::optional m_buffered_dataset_config; static json::TracingJSON nullvalue; template @@ -345,9 +347,19 @@ class ADIOS2IOHandlerImpl // use m_config std::optional> getOperators(); + enum class Shape + { + GlobalArray, + LocalValue + }; + template - std::vector getDatasetOperators( - Parameter const &, Writable *, std::string const &varName); + auto parseDatasetConfig( + Parameter const &, + Writable *, + std::string const &varName, + std::vector default_operators = {}) + -> std::tuple, Shape>; std::string fileSuffix(bool verbose = true) const; @@ -546,6 +558,10 @@ class ADIOS2IOHandlerImpl } // TODO leave this check to ADIOS? adios2::Dims shape = var.Shape(); + if (shape == adios2::Dims{adios2::LocalValueDim}) + { + return var; + } auto actualDim = shape.size(); { auto requiredDim = extent.size(); diff --git a/include/openPMD/auxiliary/StringManip.hpp b/include/openPMD/auxiliary/StringManip.hpp index eb3799d3be..ede4ab7152 100644 --- a/include/openPMD/auxiliary/StringManip.hpp +++ b/include/openPMD/auxiliary/StringManip.hpp @@ -243,6 +243,14 @@ namespace auxiliary return std::forward(s); } + /** Write a string representation of a vector or another iterable + * container to a stream. + * + * @param s The stream to write to. + * @param vec The vector or other iterable container. + * @return The modified stream. Each item is + * formatted using the default definition for operator<<(). + */ template auto write_vec_to_stream(Stream &&s, Vec const &vec) -> Stream && { @@ -265,6 +273,13 @@ namespace auxiliary return std::forward(s); } + /** Create a string representation of a vector or another iterable + * container. + * + * @param vec The vector or other iterable container. + * @return A string that shows the items of the container. Each item is + * formatted using the default definition for operator<<(). + */ template auto vec_as_string(Vec const &vec) -> std::string { diff --git a/src/IO/ADIOS/ADIOS2File.cpp b/src/IO/ADIOS/ADIOS2File.cpp index d0ab669ea6..5c2f504b34 100644 --- a/src/IO/ADIOS/ADIOS2File.cpp +++ b/src/IO/ADIOS/ADIOS2File.cpp @@ -109,7 +109,25 @@ void WriteDataset::call(ADIOS2File &ba, detail::BufferedPut &bp) std::nullopt, ba.variables()); - engine.Put(var, ptr); + // https://adios2.readthedocs.io/en/v2.9.2/components/components.html#shapes + if (var.Shape() == adios2::Dims{adios2::LocalValueDim}) + { + if (bp.param.extent != Extent{1}) + { + throw error::OperationUnsupportedInBackend( + "ADIOS2", + "Can only write a single element to LocalValue " + "variables (extent == Extent{1}, but extent of '" + + bp.name + " was " + + auxiliary::vec_as_string(bp.param.extent) + + "')."); + } + engine.Put(var, *ptr); + } + else + { + engine.Put(var, ptr); + } } else if constexpr (std::is_same_v< ptr_type, @@ -180,7 +198,24 @@ struct RunUniquePtrPut bufferedPut.name, std::nullopt, ba.variables()); - engine.Put(var, ptr); + // https://adios2.readthedocs.io/en/v2.9.2/components/components.html#shapes + if (var.Shape() == adios2::Dims{adios2::LocalValueDim}) + { + if (bufferedPut.extent != Extent{1}) + { + throw error::OperationUnsupportedInBackend( + "ADIOS2", + "Can only write a single element to LocalValue " + "variables (extent == Extent{1}, but extent of '" + + bufferedPut.name + " was " + + auxiliary::vec_as_string(bufferedPut.extent) + "')."); + } + engine.Put(var, *ptr); + } + else + { + engine.Put(var, ptr); + } } static constexpr char const *errorMsg = "RunUniquePtrPut"; diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 44e862ffe2..749400e179 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -35,6 +35,7 @@ #include "openPMD/ThrowError.hpp" #include "openPMD/auxiliary/Environment.hpp" #include "openPMD/auxiliary/Filesystem.hpp" +#include "openPMD/auxiliary/JSON.hpp" #include "openPMD/auxiliary/JSONMatcher.hpp" #include "openPMD/auxiliary/JSON_internal.hpp" #include "openPMD/auxiliary/Mpi.hpp" @@ -232,6 +233,24 @@ void ADIOS2IOHandlerImpl::init( groupTableViaEnv == 0 ? UseGroupTable::No : UseGroupTable::Yes; } + { + constexpr char const *const init_json_shadow_str = R"( + { + "adios2": { + "dataset": { + "operators": null, + "shape": null + } + } + })"; + auto init_json_shadow = nlohmann::json::parse(init_json_shadow_str); + std::cout << "Will merge:\n" + << init_json_shadow << "\ninto:\n" + << cfg.getShadow() << std::endl; + json::merge_internal( + cfg.getShadow(), init_json_shadow, /* do_prune = */ false); + } + if (cfg.json().contains("adios2")) { m_config = cfg["adios2"]; @@ -308,7 +327,7 @@ void ADIOS2IOHandlerImpl::init( auto operators = getOperators(); if (operators) { - defaultOperators = std::move(operators.value()); + readOperators = std::move(operators.value()); } } } @@ -377,32 +396,111 @@ ADIOS2IOHandlerImpl::getOperators() } template -auto ADIOS2IOHandlerImpl::getDatasetOperators( - Parameter const ¶meters, Writable *writable, std::string const &varName) - -> std::vector +auto ADIOS2IOHandlerImpl::parseDatasetConfig( + Parameter const ¶meters, + Writable *writable, + std::string const &varName, + std::vector operators) + -> std::tuple, Shape> { - std::vector operators; - json::TracingJSON options = - parameters.template compileJSONConfig( - writable, *m_handler->jsonMatcher, "adios2"); - if (options.json().contains("adios2")) - { - json::TracingJSON datasetConfig(options["adios2"]); - auto datasetOperators = getOperators(datasetConfig); + json::TracingJSON parsedConfig = [&]() -> json::ParsedConfig { + if (!m_buffered_dataset_config.has_value()) + { + // we are only interested in these values from the global config + constexpr char const *const mask_for_global_conf = R"( + { + "dataset": { + "operators": null, + "shape": null + } + })"; + m_buffered_dataset_config = m_config.json(); + json::filterByTemplate( + *m_buffered_dataset_config, + nlohmann::json::parse(mask_for_global_conf)); + } + auto const &buffered_config = *m_buffered_dataset_config; + auto parsed_config = + parameters.template compileJSONConfig( + writable, *m_handler->jsonMatcher, "adios2"); + if (auto adios2_config_it = parsed_config.config.find("adios2"); + adios2_config_it != parsed_config.config.end()) + { + auto copy = buffered_config; + json::merge_internal( + copy, adios2_config_it.value(), /* do_prune = */ false); + copy = nlohmann::json{{"adios2", std::move(copy)}}; + parsed_config.config = std::move(copy); + } + else + { + parsed_config.config["adios2"] = buffered_config; + } + return parsed_config; + }(); + + Shape arrayShape = Shape::GlobalArray; + [&]() { + if (!parsedConfig.json().contains("adios2")) + { + return; + }; + json::TracingJSON adios2Config(parsedConfig["adios2"]); + auto datasetOperators = getOperators(adios2Config); + if (datasetOperators.has_value()) + { + operators = std::move(*datasetOperators); + } + if (!adios2Config.json().contains("dataset")) + { + return; + } + auto datasetConfig = adios2Config["dataset"]; + if (!datasetConfig.json().contains("shape")) + { + return; + } + auto maybe_shape = + json::asLowerCaseStringDynamic(datasetConfig["shape"].json()); + if (!maybe_shape.has_value()) + { + throw error::BackendConfigSchema( + {"adios2", "dataset", "shape"}, + "Must be convertible to string type."); + } + auto const &shape = *maybe_shape; + if (shape == "global_array") + { + arrayShape = Shape::GlobalArray; + } + else if (shape == "local_value") + { + arrayShape = Shape::LocalValue; + } + else + { + throw error::BackendConfigSchema( + {"adios2", "dataset", "shape"}, + "Unknown value: '" + shape + "'."); + } + }(); + +#if 0 + std::cout << "Operations for '" << varName << "':"; + for(auto const & op: operators) + { + std::cout << " '" << op.op.Type() << "'"; + } + std::cout << std::endl; +#endif - operators = datasetOperators ? std::move(datasetOperators.value()) - : defaultOperators; - } - else - { - operators = defaultOperators; - } parameters.warnUnusedParameters( - options, + parsedConfig, "adios2", "Warning: parts of the backend configuration for ADIOS2 dataset '" + varName + "' remain unused:\n"); - return operators; + + return {std::move(operators), arrayShape}; } using AcceptedEndingsForEngine = std::map; @@ -814,15 +912,51 @@ void ADIOS2IOHandlerImpl::createDataset( filePos->gd = GroupOrDataset::DATASET; auto const varName = nameOfVariable(writable); - std::vector operators = - getDatasetOperators(parameters, writable, varName); + // Captured structured bindings are a C++20 extension... + std::vector operators; + Shape arrayShape; + std::tie(operators, arrayShape) = + parseDatasetConfig(parameters, writable, varName); - // cast from openPMD::Extent to adios2::Dims - adios2::Dims shape(parameters.extent.begin(), parameters.extent.end()); - if (auto jd = parameters.joinedDimension; jd.has_value()) - { - shape[jd.value()] = adios2::JoinedDim; - } + adios2::Dims shape = [&]() { + switch (arrayShape) + { + + case Shape::GlobalArray: { + // cast from openPMD::Extent to adios2::Dims + adios2::Dims res( + parameters.extent.begin(), parameters.extent.end()); + if (auto jd = parameters.joinedDimension; jd.has_value()) + { + res[jd.value()] = adios2::JoinedDim; + } + return res; + } + case Shape::LocalValue: { + int required_size = 1; +#if openPMD_HAVE_MPI + if (m_communicator.has_value()) + { + MPI_Comm_size(*m_communicator, &required_size); + } +#endif + if (parameters.extent != + Extent{Extent::value_type(required_size)}) + { + throw error::OperationUnsupportedInBackend( + "ADIOS2", + "Shape for local value array must be a 1D array " + "equivalent to the MPI size ('" + + varName + "' has shape " + + auxiliary::vec_as_string(parameters.extent) + + ", but should have shape [" + + std::to_string(required_size) + "])."); + } + return adios2::Dims{adios2::LocalValueDim}; + } + } + throw std::runtime_error("Unreachable!"); + }(); auto &fileData = getFileData(file, IfFileNotOpen::ThrowError); @@ -1087,8 +1221,8 @@ void ADIOS2IOHandlerImpl::openDataset( * reading, so the dataset-specific configuration should still be explored * here. */ - std::vector operators = - getDatasetOperators(parameters, writable, varName); + [[maybe_unused]] auto [operators, _] = + parseDatasetConfig(parameters, writable, varName, readOperators); switchAdios2VariableType( *parameters.dtype, this, @@ -1192,6 +1326,11 @@ namespace detail varName, std::nullopt, ba.variables()); + if (variable.Shape() == adios2::Dims{adios2::LocalValueDim}) + { + params.out->backendManagedBuffer = false; + return; + } adios2::Dims offset(params.offset.begin(), params.offset.end()); adios2::Dims extent(params.extent.begin(), params.extent.end()); variable.SetSelection({std::move(offset), std::move(extent)}); @@ -2426,7 +2565,23 @@ namespace detail } else { - var.SetShape(shape); + auto const &old_shape = var.Shape(); + bool shape_changed = old_shape.size() != shape.size(); + if (!shape_changed) + { + for (size_t i = 0; i < old_shape.size(); ++i) + { + if (old_shape[i] != shape[i]) + { + shape_changed = true; + break; + } + } + } + if (shape_changed) + { + var.SetShape(shape); + } if (count.size() > 0) { var.SetSelection({start, count}); diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index b3d93a9289..64cb15bdb7 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -7,6 +7,7 @@ #include "openPMD/IO/Access.hpp" #include "openPMD/auxiliary/Environment.hpp" #include "openPMD/auxiliary/Filesystem.hpp" +#include "openPMD/backend/PatchRecordComponent.hpp" #include "openPMD/openPMD.hpp" #include @@ -387,7 +388,7 @@ void available_chunks_test(std::string const &file_ending) MPI_Comm_size(MPI_COMM_WORLD, &r_mpi_size); unsigned mpi_rank{static_cast(r_mpi_rank)}, mpi_size{static_cast(r_mpi_size)}; - std::string name = "../samples/available_chunks." + file_ending; + std::string name = "../samples/parallel_available_chunks." + file_ending; /* * ADIOS2 assigns writerIDs to blocks in a BP file by id of the substream @@ -400,7 +401,6 @@ void available_chunks_test(std::string const &file_ending) { "engine": { - "type": "bp4", "parameters": { "NumAggregators":)END" @@ -420,6 +420,56 @@ void available_chunks_test(std::string const &file_ending) auto E_x = it0.meshes["E"]["x"]; E_x.resetDataset({Datatype::INT, {mpi_size, 4}}); E_x.storeChunk(data, {mpi_rank, 0}, {1, 4}); + + /* + * Verify that block decomposition also works in "local value" variable + * shape. That shape instructs the data to participate in ADIOS2 + * metadata aggregation, hence there is only one "real" written block, + * the aggregated one. We still need the original logical blocks to be + * present in reading. + */ + + auto electrons = it0.particles["e"].particlePatches; + auto numParticles = electrons["numParticles"]; + auto numParticlesOffset = electrons["numParticlesOffset"]; + for (auto rc : {&numParticles, &numParticlesOffset}) + { + rc->resetDataset( + {Datatype::ULONG, + {Extent::value_type{mpi_size}}, + R"(adios2.dataset.shape = "local_value")"}); + } + numParticles.storeChunk( + std::make_unique(10), {size_t(mpi_rank)}, {1}); + numParticlesOffset.storeChunk( + std::make_unique(10 * ((unsigned long)mpi_rank)), + {size_t(mpi_rank)}, + {1}); + auto offset = electrons["offset"]; + for (auto const &dim : {"x", "y", "z"}) + { + auto rc = offset[dim]; + rc.resetDataset( + {Datatype::ULONG, + {Extent::value_type{mpi_size}}, + R"(adios2.dataset.shape = "local_value")"}); + rc.storeChunk( + std::make_unique((unsigned long)mpi_rank), + {size_t(mpi_rank)}, + {1}); + } + auto extent = electrons["extent"]; + for (auto const &dim : {"x", "y", "z"}) + { + auto rc = extent[dim]; + rc.resetDataset( + {Datatype::ULONG, + {Extent::value_type{mpi_size}}, + R"(adios2.dataset.shape = "local_value")"}); + rc.storeChunk( + std::make_unique(1), {size_t(mpi_rank)}, {1}); + } + it0.close(); } @@ -451,12 +501,40 @@ void available_chunks_test(std::string const &file_ending) { REQUIRE(ranks[i] == i); } + + auto electrons = it0.particles["e"].particlePatches; + for (PatchRecordComponent *prc : + {static_cast(&electrons["numParticles"]), + static_cast( + &electrons["numParticlesOffset"]), + &electrons["offset"]["x"], + &electrons["offset"]["y"], + &electrons["extent"]["z"], + &electrons["offset"]["x"], + &electrons["extent"]["y"], + &electrons["extent"]["z"]}) + { + auto available_chunks = prc->availableChunks(); + REQUIRE(size_t(r_mpi_size) == available_chunks.size()); + for (size_t i = 0; i < available_chunks.size(); ++i) + { + auto const &chunk = available_chunks[i]; + REQUIRE(chunk.extent == Extent{1}); + REQUIRE(chunk.offset == Offset{i}); + REQUIRE(chunk.sourceID == i); + } + } } } TEST_CASE("available_chunks_test", "[parallel][adios]") { +#if HAS_ADIOS_2_9 + available_chunks_test("bp4"); + available_chunks_test("bp5"); +#else available_chunks_test("bp"); +#endif } #endif