Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Docs/source/usage/parameters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2928,6 +2928,9 @@ In-situ capabilities can be used by turning on Sensei or Ascent (provided they a
``variable based`` is an `experimental feature with ADIOS2 BP5 <https://openpmd-api.readthedocs.io/en/0.16.1/backends/adios2.html#experimental-new-adios2-schema>`__ that will replace ``g``.
Default: ``f`` (full diagnostics)

* ``<diag_name>.buffer_flush_limit_btd`` (`integer`; defaults to 5) optional, only read if ``<diag_name>.diag_type = BackTransformed``
This parameter is intended for ADIOS backend to group every N buffers (N is the value of this parameter) and then flush to disk.

* ``<diag_name>.adios2_operator.type`` (``zfp``, ``blosc``) optional,
`ADIOS2 I/O operator type <https://openpmd-api.readthedocs.io/en/0.16.1/details/backendconfig.html#adios2>`__ for `openPMD <https://www.openPMD.org>`_ data dumps.

Expand Down
5 changes: 5 additions & 0 deletions Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public:
private:
/** This is responsible for dumping to file */
std::unique_ptr< WarpXOpenPMDPlot > m_OpenPMDPlotWriter;

/** This parameter is corresponding to the input option
"buffer_flush_limit_btd" at the diagnostic level.
By default we set to flush every 5 buffers per snapshot */
int m_NumAggBTDBufferToFlush = 5;
};

#endif // WARPX_FLUSHFORMATOPENPMD_H_
5 changes: 5 additions & 0 deletions Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ FlushFormatOpenPMD::FlushFormatOpenPMD (const std::string& diag_name)
ablastr::warn_manager::WMRecordWarning("Diagnostics", warnMsg);
encoding = openPMD::IterationEncoding::groupBased;
}

pp_diag_name.query("buffer_flush_limit_btd", m_NumAggBTDBufferToFlush);
}

//
Expand Down Expand Up @@ -176,6 +178,9 @@ FlushFormatOpenPMD::WriteToFile (
m_OpenPMDPlotWriter->WriteOpenPMDParticles(
particle_diags, static_cast<amrex::Real>(time), use_pinned_pc, isBTD, isLastBTDFlush);

if (isBTD && (bufferID % m_NumAggBTDBufferToFlush == 0) )
m_OpenPMDPlotWriter->FlushBTDToDisk();

// signal that no further updates will be written to this iteration
m_OpenPMDPlotWriter->CloseStep(isBTD, isLastBTDFlush);
}
23 changes: 21 additions & 2 deletions Source/Diagnostics/WarpXOpenPMD.H
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,21 @@ public:
/** Return OpenPMD File type ("bp5", "bp4", "h5" or "json")*/
std::string OpenPMDFileType () { return m_OpenPMDFileType; }

/** Ensure BTD buffers are written to disk
*
* This function can be called to intermediately ensure ADIOS buffered "steps"
* are written to disk, and the valid metadata if checkpointing is required.
*
* This is needed to read partial data while a simulation is running or
* to support restarting (the BTD diagnostics) in WarpX, so it
* can continue to append to a partially written labframe station
* after restart.
*
* The frequency is controlled by
* FlushFormatOpenPMD::m_NumAggBTDBufferToFlush (default to 5).
* It can be adjusted in the input file: diag_name.buffer_flush_limit_btd
*/
void FlushBTDToDisk ();
private:
void Init (openPMD::Access access, bool isBTD);

Expand Down Expand Up @@ -181,14 +196,18 @@ private:
* @param[in] isBTD if the current diagnostic is BTD
*
* if isBTD=false, apply the default flush behaviour
* if isBTD=true, advice to use ADIOS Put() instead of PDW for better performance.
* in ADIOS, the action will be PerformDataWrite
* if isBTD=true, in ADIOS, the action will be PerformPut
* because no action is taken for the span tasks.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this generally true? You only add spans for fields right now, what if particles are involved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right BTD particles will be calling PerformPut. Right now particles is not the I/O performance bottleneck for BTD. It will happen when you have enormous amount of particles. I will definitely revisit this issue with another pull request.

* This way we can aggregate buffers before
* calling FlushBTDToDisk() to write out.
*
* iteration.seriesFlush() is used instead of series.flush()
* because the latter flushes only if data is dirty
* this causes trouble when the underlying writing function is collective (like PDW)
*
*/
void flushCurrent (bool isBTD) const;
void seriesFlush (bool isBTD) const;

/** This function does initial setup for the fields when interation is newly created
* @param[in] meshes The meshes in a series
Expand Down
105 changes: 78 additions & 27 deletions Source/Diagnostics/WarpXOpenPMD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,13 +407,16 @@ WarpXOpenPMDPlot::~WarpXOpenPMDPlot ()
}
}

void WarpXOpenPMDPlot::flushCurrent (bool isBTD) const
void WarpXOpenPMDPlot::seriesFlush (bool isBTD) const
{
WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent");

openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);

currIteration.seriesFlush();
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
if (isBTD) {
WARPX_PROFILE("WarpXOpenPMDPlot::SeriesFlush()::BTD");
currIteration.seriesFlush("adios2.engine.preferred_flush_target = \"buffer\"");
} else {
WARPX_PROFILE("WarpXOpenPMDPlot::SeriesFlush()()");
currIteration.seriesFlush();
}
}

std::string
Expand Down Expand Up @@ -463,6 +466,7 @@ void WarpXOpenPMDPlot::SetStep (int ts, const std::string& dirPrefix, int file_m

void WarpXOpenPMDPlot::CloseStep (bool isBTD, bool isLastBTDFlush)
{
WARPX_PROFILE("WarpXOpenPMDPlot::CloseStep()");
// default close is true
bool callClose = true;
// close BTD file only when isLastBTDFlush is true
Expand Down Expand Up @@ -666,19 +670,32 @@ for (const auto & particle_diag : particle_diags) {
pc->getCharge(), pc->getMass(),
isBTD, isLastBTDFlush);
}
}

void
WarpXOpenPMDPlot::FlushBTDToDisk()
{
bool isBTD = true;
auto hasOption = m_OpenPMDoptions.find("FlattenSteps");
const bool flattenSteps = isBTD && (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos);
const bool flattenSteps = (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos);

if (flattenSteps)
{
// forcing new step so data from each btd batch in
// preferred_flush_target="buffer" can be flushed out
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")");
WARPX_PROFILE("WarpXOpenPMDPlot::ForceFlush()");
// Here for checkpointing purpose, we ask ADIOS to create to a new step, which
// triggers writting both data and metadata.
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")");
}
else
{
WARPX_PROFILE("WarpXOpenPMDPlot::ForceFlush()::Disk()");
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "disk")");
}
}


void
WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc,
const std::string& name,
Expand Down Expand Up @@ -752,7 +769,7 @@ WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc,
SetConstParticleRecordsEDPIC(currSpecies, positionComponents, NewParticleVectorSize, charge, mass);
}

flushCurrent(isBTD);
this->seriesFlush(isBTD);

// dump individual particles
bool contributed_particles = false; // did the local MPI rank contribute particles?
Expand Down Expand Up @@ -833,7 +850,7 @@ WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc,
}
}

flushCurrent(isBTD);
this->seriesFlush(isBTD);
}

void
Expand Down Expand Up @@ -1412,6 +1429,8 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename,
// collective open
series_iteration.open();

bool hasADIOS = (m_Series->backend() == "ADIOS2");

auto meshes = series_iteration.meshes;
if (first_write_to_iteration) {
// lets see whether full_geom varies from geom[0] xgeom[1]
Expand Down Expand Up @@ -1509,26 +1528,58 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename,
// GPU pointers to the I/O library
#ifdef AMREX_USE_GPU
if (fab.arena()->isManaged() || fab.arena()->isDevice()) {
amrex::BaseFab<amrex::Real> foo(local_box, 1, amrex::The_Pinned_Arena());
std::shared_ptr<amrex::Real> data_pinned(foo.release());
amrex::Gpu::dtoh_memcpy_async(data_pinned.get(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real));
// intentionally delayed until before we .flush(): amrex::Gpu::streamSynchronize();
mesh_comp.storeChunk(data_pinned, chunk_offset, chunk_size);
} else
if (hasADIOS)
{
WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::D2H_Span()");
auto dynamicMemoryView = mesh_comp.storeChunk<amrex::Real>(
Copy link
Member

@ax3l ax3l Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, when we have an un-equal number of blocks over all MPI ranks, this will not work because storeChunk is collective :-o openPMD/openPMD-api#1794

Copy link
Member

@ax3l ax3l Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To double check: is Iteration::open() called before these blocks, so the ADIOS engine is definitely open? -- Yes
https://github.com/guj/WarpX/blob/d0bbf79e64beed82b64d29f85ec0a56ed0f5f087/Source/Diagnostics/WarpXOpenPMD.cpp#L1426-L1430

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, when we have an un-equal number of blocks over all MPI ranks, this will not work because storeChunk is collective :-o openPMD/openPMD-api#1794

Only one rank is writing when using BTD. Other ranks have no data.

chunk_offset, chunk_size,
[&local_box](size_t /* size */) {
amrex::BaseFab<amrex::Real> foo(local_box, 1, amrex::The_Pinned_Arena());
std::shared_ptr<amrex::Real> data_pinned(foo.release());
return data_pinned;
});
auto span = dynamicMemoryView.currentBuffer();
amrex::Gpu::dtoh_memcpy_async(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real));
} else
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this else branch for D2H? I thought openPMD-api has a fallback implementation for HDF5...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out no. We do need it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might be an issue, can you please open an issue in https://github.com/openPMD/openPMD-api/ ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@franzpoeschel do you understand why this is needed?

{
WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::D2H()");
amrex::BaseFab<amrex::Real> foo(local_box, 1, amrex::The_Pinned_Arena());
std::shared_ptr<amrex::Real> data_pinned(foo.release());
amrex::Gpu::dtoh_memcpy_async(data_pinned.get(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real));
// intentionally delayed until before we .flush(): amrex::Gpu::streamSynchronize();
mesh_comp.storeChunk(data_pinned, chunk_offset, chunk_size);
}
} else
#endif
{
amrex::Real const *local_data = fab.dataPtr(icomp);
mesh_comp.storeChunkRaw(
local_data, chunk_offset, chunk_size);
}
}
{ // CPU
if (hasADIOS)
{
WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::CPU_span()");
auto dynamicMemoryView = mesh_comp.storeChunk<amrex::Real>(
chunk_offset, chunk_size,
[&local_box](size_t /* size */) {
amrex::BaseFab<amrex::Real> foo(local_box, 1);
std::shared_ptr<amrex::Real> data_pinned(foo.release());
return data_pinned;
});

auto span = dynamicMemoryView.currentBuffer();
std::memcpy(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real));
}
else
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: I thought openPMD-api has a fallback implementation for Span for backends that do not support it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this because the first round of CPU span failed pull request (it uses HDF5) so the fallback does not work with collecting a few buffers (during a snapshot) and then flush out. The fallback works if flushes out immediately.

{
WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::CPU_mesh()");
amrex::Real const *local_data = fab.dataPtr(icomp);
mesh_comp.storeChunkRaw( local_data, chunk_offset, chunk_size);
}
} // CPU
}
} // icomp store loop

#ifdef AMREX_USE_GPU
amrex::Gpu::streamSynchronize();
#endif
// Flush data to disk after looping over all components
flushCurrent(isBTD);
this->seriesFlush(isBTD);
} // levels loop (i)
}
#endif // WARPX_USE_OPENPMD
Expand Down
Loading