Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Docs/source/usage/parameters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2914,6 +2914,9 @@ In-situ capabilities can be used by turning on Sensei or Ascent (provided they a
``variable based`` is an `experimental feature with ADIOS2 BP5 <https://openpmd-api.readthedocs.io/en/0.16.1/backends/adios2.html#experimental-new-adios2-schema>`__ that will replace ``g``.
Default: ``f`` (full diagnostics)

* ``<diag_name>.buffer_flush_limit_btd`` (`integer`; defaults to 5) optional, only read if ``<diag_name>.diag_type = BackTransformed``
This parameter is intended for ADIOS backend to group every N buffers (N is the value of this parameter) and then flush to disk.

* ``<diag_name>.adios2_operator.type`` (``zfp``, ``blosc``) optional,
`ADIOS2 I/O operator type <https://openpmd-api.readthedocs.io/en/0.16.1/details/backendconfig.html#adios2>`__ for `openPMD <https://www.openPMD.org>`_ data dumps.

Expand Down
5 changes: 5 additions & 0 deletions Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public:
private:
/** This is responsible for dumping to file */
std::unique_ptr< WarpXOpenPMDPlot > m_OpenPMDPlotWriter;

/** This parameter is corresponding to the input option
"buffer_flush_limit_btd" at the diagnostic level.
By default we set to flush every 5 buffers per snapshot */
int m_NumAggBTDBufferToFlush = 5;
};

#endif // WARPX_FLUSHFORMATOPENPMD_H_
5 changes: 5 additions & 0 deletions Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ FlushFormatOpenPMD::FlushFormatOpenPMD (const std::string& diag_name)
ablastr::warn_manager::WMRecordWarning("Diagnostics", warnMsg);
encoding = openPMD::IterationEncoding::groupBased;
}

pp_diag_name.query("buffer_flush_limit_btd", m_NumAggBTDBufferToFlush);
}

//
Expand Down Expand Up @@ -176,6 +178,9 @@ FlushFormatOpenPMD::WriteToFile (
m_OpenPMDPlotWriter->WriteOpenPMDParticles(
particle_diags, static_cast<amrex::Real>(time), use_pinned_pc, isBTD, isLastBTDFlush);

if (isBTD && (bufferID % m_NumAggBTDBufferToFlush == 0) )
m_OpenPMDPlotWriter->FlushBTDToDisk();

// signal that no further updates will be written to this iteration
m_OpenPMDPlotWriter->CloseStep(isBTD, isLastBTDFlush);
}
21 changes: 20 additions & 1 deletion Source/Diagnostics/WarpXOpenPMD.H
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,21 @@ public:
/** Return OpenPMD File type ("bp5", "bp4", "h5" or "json")*/
std::string OpenPMDFileType () { return m_OpenPMDFileType; }

/** Ensure BTD buffers are written to disk
*
* This function can be called to intermediately ensure ADIOS buffered "steps"
* are written to disk, and the valid metadata if checkpointing is required.
*
* This is needed to read partial data while a simulation is running or
* to support restarting (the BTD diagnostics) in WarpX, so it
* can continue to append to a partially written labframe station
* after restart.
*
* The frequency is controlled by
* FlushFormatOpenPMD::m_NumAggBTDBufferToFlush (default to 5).
* It can be adjusted in the input file: diag_name.buffer_flush_limit_btd
*/
void FlushBTDToDisk ();
private:
void Init (openPMD::Access access, bool isBTD);

Expand Down Expand Up @@ -181,7 +196,11 @@ private:
* @param[in] isBTD if the current diagnostic is BTD
*
* if isBTD=false, apply the default flush behaviour
* if isBTD=true, advice to use ADIOS Put() instead of PDW for better performance.
* in ADIOS, the action will be PerformDataWrite
* if isBTD=true, in ADIOS, the action will be PerformPut
* because no action is taken for the span tasks.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this generally true? You only add spans for fields right now, what if particles are involved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right BTD particles will be calling PerformPut. Right now particles is not the I/O performance bottleneck for BTD. It will happen when you have enormous amount of particles. I will definitely revisit this issue with another pull request.

* This way we can aggregate buffers before
* calling FlushBTDToDisk() to write out.
*
* iteration.seriesFlush() is used instead of series.flush()
* because the latter flushes only if data is dirty
Expand Down
60 changes: 43 additions & 17 deletions Source/Diagnostics/WarpXOpenPMD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,14 @@ WarpXOpenPMDPlot::~WarpXOpenPMDPlot ()

void WarpXOpenPMDPlot::flushCurrent (bool isBTD) const
{
WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent");

openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);

currIteration.seriesFlush();
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
if (isBTD) {
WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent()::BTD");
currIteration.seriesFlush("adios2.engine.preferred_flush_target = \"buffer\"");
} else {
WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent()");
currIteration.seriesFlush();
}
}

std::string
Expand Down Expand Up @@ -463,6 +466,7 @@ void WarpXOpenPMDPlot::SetStep (int ts, const std::string& dirPrefix, int file_m

void WarpXOpenPMDPlot::CloseStep (bool isBTD, bool isLastBTDFlush)
{
WARPX_PROFILE("WarpXOpenPMDPlot::CloseStep()");
// default close is true
bool callClose = true;
// close BTD file only when isLastBTDFlush is true
Expand Down Expand Up @@ -666,19 +670,32 @@ for (const auto & particle_diag : particle_diags) {
pc->getCharge(), pc->getMass(),
isBTD, isLastBTDFlush);
}
}

void
WarpXOpenPMDPlot::FlushBTDToDisk()
{
bool isBTD = true;
auto hasOption = m_OpenPMDoptions.find("FlattenSteps");
const bool flattenSteps = isBTD && (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos);
const bool doFlattenSteps = (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos);

if (flattenSteps)
if (doFlattenSteps)
{
// forcing new step so data from each btd batch in
// preferred_flush_target="buffer" can be flushed out
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")");
WARPX_PROFILE("WarpXOpenPMDPlot::ForceFlush()");
// Here for checkpointing purpose, we ask ADIOS to create to a new step, which
// triggers writting both data and metadata.
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")");
}
else
{
WARPX_PROFILE("WarpXOpenPMDPlot::ForceFlush()::Disk()");
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "disk")");
}
}


void
WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc,
const std::string& name,
Expand Down Expand Up @@ -1509,12 +1526,21 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename,
// GPU pointers to the I/O library
#ifdef AMREX_USE_GPU
if (fab.arena()->isManaged() || fab.arena()->isDevice()) {
amrex::BaseFab<amrex::Real> foo(local_box, 1, amrex::The_Pinned_Arena());
std::shared_ptr<amrex::Real> data_pinned(foo.release());
amrex::Gpu::dtoh_memcpy_async(data_pinned.get(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real));
// intentionally delayed until before we .flush(): amrex::Gpu::streamSynchronize();
mesh_comp.storeChunk(data_pinned, chunk_offset, chunk_size);
} else
{
WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::D2H_Span()");
auto dynamicMemoryView = mesh_comp.storeChunk<amrex::Real>(
Copy link
Member

@ax3l ax3l Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, when we have an un-equal number of blocks over all MPI ranks, this will not work because storeChunk is collective :-o openPMD/openPMD-api#1794

Copy link
Member

@ax3l ax3l Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To double check: is Iteration::open() called before these blocks, so the ADIOS engine is definitely open? -- Yes
https://github.com/guj/WarpX/blob/d0bbf79e64beed82b64d29f85ec0a56ed0f5f087/Source/Diagnostics/WarpXOpenPMD.cpp#L1426-L1430

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, when we have an un-equal number of blocks over all MPI ranks, this will not work because storeChunk is collective :-o openPMD/openPMD-api#1794

Only one rank is writing when using BTD. Other ranks have no data.

chunk_offset, chunk_size,
[&local_box](size_t size) {
(void) size;
amrex::BaseFab<amrex::Real> foo(local_box, 1, amrex::The_Pinned_Arena());
std::shared_ptr<amrex::Real> data_pinned(foo.release());
return data_pinned;
});

auto span = dynamicMemoryView.currentBuffer();
amrex::Gpu::dtoh_memcpy_async(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real));
}
} else
#endif
{
amrex::Real const *local_data = fab.dataPtr(icomp);
Expand Down
Loading