Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public:
private:
/** This is responsible for dumping to file */
std::unique_ptr< WarpXOpenPMDPlot > m_OpenPMDPlotWriter;

/** This parameter is corresponding to the input option
"buffer_flush_limit_btd" at the diagnostic level.
By default we set to flush every 5 buffers per snapshot */
int m_NumAggBTDBufferToFlush = 5;
};

#endif // WARPX_FLUSHFORMATOPENPMD_H_
5 changes: 5 additions & 0 deletions Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ FlushFormatOpenPMD::FlushFormatOpenPMD (const std::string& diag_name)
ablastr::warn_manager::WMRecordWarning("Diagnostics", warnMsg);
encoding = openPMD::IterationEncoding::groupBased;
}

pp_diag_name.query("buffer_flush_limit_btd", m_NumAggBTDBufferToFlush);
}

//
Expand Down Expand Up @@ -176,6 +178,9 @@ FlushFormatOpenPMD::WriteToFile (
m_OpenPMDPlotWriter->WriteOpenPMDParticles(
particle_diags, static_cast<amrex::Real>(time), use_pinned_pc, isBTD, isLastBTDFlush);

if (bufferID % m_NumAggBTDBufferToFlush == 0)
m_OpenPMDPlotWriter->ForceFlush(isBTD);

// signal that no further updates will be written to this iteration
m_OpenPMDPlotWriter->CloseStep(isBTD, isLastBTDFlush);
}
13 changes: 12 additions & 1 deletion Source/Diagnostics/WarpXOpenPMD.H
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ public:
/** Return OpenPMD File type ("bp5", "bp4", "h5" or "json")*/
std::string OpenPMDFileType () { return m_OpenPMDFileType; }

/** Flush a few BTD buffers in a snapshot
* @param[in] isBTD if the current diagnostic is BTD
* This function is controlled by the paramter
* FlushFormatOpenPMD::m_NumAggBTDBufferToFlush (default to 5),
* it can be adjusted in the input file: diag_name.buffer_flush_limit_btd
*/
void ForceFlush (bool isBTD);
private:
void Init (openPMD::Access access, bool isBTD);

Expand Down Expand Up @@ -181,7 +188,11 @@ private:
* @param[in] isBTD if the current diagnostic is BTD
*
* if isBTD=false, apply the default flush behaviour
* if isBTD=true, advice to use ADIOS Put() instead of PDW for better performance.
* in ADIOS, the action will be PerformDataWrite
* if isBTD=true, in ADIOS, the action will be PerformPut
* because no action is taken for the span tasks.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this generally true? You only add spans for fields right now, what if particles are involved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right BTD particles will be calling PerformPut. Right now particles is not the I/O performance bottleneck for BTD. It will happen when you have enormous amount of particles. I will definitely revisit this issue with another pull request.

* This way we can aggregate buffers before
* calling ForceFlush(isBTD) to write out.
*
* iteration.seriesFlush() is used instead of series.flush()
* because the latter flushes only if data is dirty
Expand Down
61 changes: 44 additions & 17 deletions Source/Diagnostics/WarpXOpenPMD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,14 @@ WarpXOpenPMDPlot::~WarpXOpenPMDPlot ()

void WarpXOpenPMDPlot::flushCurrent (bool isBTD) const
{
WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent");

openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);

currIteration.seriesFlush();
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
if (isBTD) {
WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent()::BTD");
currIteration.seriesFlush("adios2.engine.preferred_flush_target = \"buffer\"");
} else {
WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent()");
currIteration.seriesFlush();
}
}

std::string
Expand Down Expand Up @@ -463,6 +466,7 @@ void WarpXOpenPMDPlot::SetStep (int ts, const std::string& dirPrefix, int file_m

void WarpXOpenPMDPlot::CloseStep (bool isBTD, bool isLastBTDFlush)
{
WARPX_PROFILE("WarpXOpenPMDPlot::CloseStep()");
// default close is true
bool callClose = true;
// close BTD file only when isLastBTDFlush is true
Expand Down Expand Up @@ -666,19 +670,32 @@ for (const auto & particle_diag : particle_diags) {
pc->getCharge(), pc->getMass(),
isBTD, isLastBTDFlush);
}
}

void
WarpXOpenPMDPlot::ForceFlush(bool isBTD)
{
if (!isBTD)
return;

auto hasOption = m_OpenPMDoptions.find("FlattenSteps");
const bool flattenSteps = isBTD && (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos);
const bool result = (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos);

if (flattenSteps)
if (result)
{
// forcing new step so data from each btd batch in
// preferred_flush_target="buffer" can be flushed out
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")");
WARPX_PROFILE("WarpXOpenPMDPlot::FlattenSteps()");
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")");
}
else
{
WARPX_PROFILE("WarpXOpenPMDPlot::ForceFlush()::Disk()");
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "disk")");
}
}


void
WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc,
const std::string& name,
Expand Down Expand Up @@ -1509,12 +1526,22 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename,
// GPU pointers to the I/O library
#ifdef AMREX_USE_GPU
if (fab.arena()->isManaged() || fab.arena()->isDevice()) {
amrex::BaseFab<amrex::Real> foo(local_box, 1, amrex::The_Pinned_Arena());
std::shared_ptr<amrex::Real> data_pinned(foo.release());
amrex::Gpu::dtoh_memcpy_async(data_pinned.get(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real));
// intentionally delayed until before we .flush(): amrex::Gpu::streamSynchronize();
mesh_comp.storeChunk(data_pinned, chunk_offset, chunk_size);
} else
{
WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::D2H_Span()");
auto dynamicMemoryView = mesh_comp.storeChunk<amrex::Real>(
Copy link
Member

@ax3l ax3l Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, when we have an un-equal number of blocks over all MPI ranks, this will not work because storeChunk is collective :-o openPMD/openPMD-api#1794

Copy link
Member

@ax3l ax3l Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To double check: is Iteration::open() called before these blocks, so the ADIOS engine is definitely open? -- Yes
https://github.com/guj/WarpX/blob/d0bbf79e64beed82b64d29f85ec0a56ed0f5f087/Source/Diagnostics/WarpXOpenPMD.cpp#L1426-L1430

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, when we have an un-equal number of blocks over all MPI ranks, this will not work because storeChunk is collective :-o openPMD/openPMD-api#1794

Only one rank is writing when using BTD. Other ranks have no data.

chunk_offset, chunk_size,
[&local_box](size_t size) {
(void) size;
amrex::Print()<<" span failed \n";
amrex::BaseFab<amrex::Real> foo(local_box, 1, amrex::The_Pinned_Arena());
std::shared_ptr<amrex::Real> data_pinned(foo.release());
return data_pinned;
});

auto span = dynamicMemoryView.currentBuffer();
amrex::Gpu::dtoh_memcpy_async(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real));
}
} else
#endif
{
amrex::Real const *local_data = fab.dataPtr(icomp);
Expand Down
Loading