Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion examples/10_streaming_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ int main()
}

// open file for writing
// use QueueFullPolicy = Discard in order to create a situation where from
// the reader's perspective steps are skipped. This tests the bug reported
// in https://github.com/openPMD/openPMD-api/issues/1747.
Series series = Series("electrons.sst", Access::CREATE, R"(
{
"adios2": {
"engine": {
"parameters": {
"DataTransport": "WAN"
"DataTransport": "WAN",
"QueueFullPolicy": "Discard"
}
}
}
Expand Down
10 changes: 3 additions & 7 deletions include/openPMD/IO/ADIOS/ADIOS2File.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ class ADIOS2File

size_t currentStep();
void setStepSelection(std::optional<size_t>);
[[nodiscard]] std::optional<size_t> stepSelection() const;
[[nodiscard]] std::optional<size_t> const &stepSelection() const;

[[nodiscard]] detail::AdiosAttributes const &attributes() const
{
Expand All @@ -431,13 +431,9 @@ class ADIOS2File
std::optional<adios2::Engine> m_engine; //! ADIOS engine

/*
* Not all engines support the CurrentStep() call, so we have to
* implement this manually.
* Note: We don't use a std::optional<size_t> here since the currentStep
* is always being counted.
* Used for selecting steps in adios2::Mode::ReadRandomAccesss.
*/
size_t m_currentStep = 0;
bool useStepSelection = false;
std::optional<size_t> m_stepSelection;
std::optional<size_t> m_max_steps_bp5 = std::make_optional<size_t>(100);

/*
Expand Down
39 changes: 11 additions & 28 deletions src/IO/ADIOS/ADIOS2File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,9 @@ size_t ADIOS2File::currentStep()
{
return *step_selection;
}
else if (nonpersistentEngine(m_impl->m_engineType))
else if (m_mode == adios2::Mode::ReadRandomAccess)
{
return m_currentStep;
return 0;
}
else
{
Expand All @@ -381,28 +381,12 @@ void ADIOS2File::setStepSelection(std::optional<size_t> step)
"ADIOS2 backend: Cannot only use random-access step selections "
"when reading without streaming mode.");
}
if (!step.has_value())
{
m_currentStep = 0;
useStepSelection = false;
}
else
{
m_currentStep = *step;
useStepSelection = true;
}
m_stepSelection = step;
}

std::optional<size_t> ADIOS2File::stepSelection() const
std::optional<size_t> const &ADIOS2File::stepSelection() const
{
if (useStepSelection)
{
return {m_currentStep};
}
else
{
return std::nullopt;
}
return m_stepSelection;
}

void ADIOS2File::configure_IO_Read()
Expand Down Expand Up @@ -1172,7 +1156,6 @@ void ADIOS2File::flush_impl(ADIOS2FlushParams flushParams, bool writeLatePuts)
}
engine.EndStep();
engine.BeginStep();
// ++m_currentStep; // think we should keep this as the logical step
m_uniquePtrPuts.clear();
uncommittedAttributes.clear();
m_updateSpans.clear();
Expand Down Expand Up @@ -1257,14 +1240,14 @@ AdvanceStatus ADIOS2File::advance(AdvanceMode mode)
uncommittedAttributes.clear();
m_updateSpans.clear();
streamStatus = StreamStatus::OutsideOfStep;
++m_currentStep;
return AdvanceStatus::OK;
}
case AdvanceMode::BEGINSTEP: {
adios2::StepStatus adiosStatus{};
auto &engine = getEngine();

auto check_bp5 = [&]() -> bool {
std::string engineType = getEngine().Type();
std::string engineType = engine.Type();
std::transform(
engineType.begin(),
engineType.end(),
Expand All @@ -1273,7 +1256,7 @@ AdvanceStatus ADIOS2File::advance(AdvanceMode mode)
return engineType == "bp5writer";
};

if (this->m_currentStep == 0)
if (engine.CurrentStep() == 0)
{
int max_steps_from_env =
auxiliary::getEnvNum("OPENPMD_BP5_GROUPENCODING_MAX_STEPS", -1);
Expand All @@ -1293,7 +1276,7 @@ AdvanceStatus ADIOS2File::advance(AdvanceMode mode)
if (this->m_impl->m_handler->m_encoding ==
IterationEncoding::groupBased &&
this->m_max_steps_bp5.has_value() &&
this->m_currentStep >= *this->m_max_steps_bp5 &&
engine.CurrentStep() >= *this->m_max_steps_bp5 &&
(this->m_mode == adios2::Mode::Write ||
this->m_mode == adios2::Mode::Append) &&
check_bp5())
Expand Down Expand Up @@ -1329,7 +1312,7 @@ Be aware of the performance implications described above.)");

if (streamStatus != StreamStatus::DuringStep)
{
adiosStatus = getEngine().BeginStep();
adiosStatus = engine.BeginStep();
}
else
{
Expand Down Expand Up @@ -1403,7 +1386,7 @@ ADIOS2File::availableVariablesPrefixed(std::string const &prefix)
ADIOS2File::AttributeMap_t const &ADIOS2File::availableVariables()
{
return m_variables.availableVariables(
currentStep(), useStepSelection, m_IO);
currentStep(), stepSelection().has_value(), m_IO);
}

void ADIOS2File::markActive(Writable *writable)
Expand Down
Loading