Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public:
private:
/** This is responsible for dumping to file */
std::unique_ptr< WarpXOpenPMDPlot > m_OpenPMDPlotWriter;
int m_NumAggBTDBufferToFlush=5;
};

#endif // WARPX_FLUSHFORMATOPENPMD_H_
6 changes: 6 additions & 0 deletions Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ FlushFormatOpenPMD::FlushFormatOpenPMD (const std::string& diag_name)
ablastr::warn_manager::WMRecordWarning("Diagnostics", warnMsg);
encoding = openPMD::IterationEncoding::groupBased;
}

pp_diag_name.query("buffer_flush_limit_btd", m_NumAggBTDBufferToFlush);
amrex::Print()<<" BTD: ForceFlushEvery: "<<m_NumAggBTDBufferToFlush<<" Buffers per snapshot "<<std::endl;
}

//
Expand Down Expand Up @@ -176,6 +179,9 @@ FlushFormatOpenPMD::WriteToFile (
m_OpenPMDPlotWriter->WriteOpenPMDParticles(
particle_diags, static_cast<amrex::Real>(time), use_pinned_pc, isBTD, isLastBTDFlush);

if (bufferID % m_NumAggBTDBufferToFlush == 0)
m_OpenPMDPlotWriter->ForceFlush(isBTD);

// signal that no further updates will be written to this iteration
m_OpenPMDPlotWriter->CloseStep(isBTD, isLastBTDFlush);
}
1 change: 1 addition & 0 deletions Source/Diagnostics/WarpXOpenPMD.H
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public:
/** Return OpenPMD File type ("bp5", "bp4", "h5" or "json")*/
std::string OpenPMDFileType () { return m_OpenPMDFileType; }

void ForceFlush(bool isBTD);
private:
void Init (openPMD::Access access, bool isBTD);

Expand Down
78 changes: 61 additions & 17 deletions Source/Diagnostics/WarpXOpenPMD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,13 +407,27 @@
}
}

/*
* If I/O is through ADIOS:
* isBTD=true => PerformPut
* this way we do not flush out every buffer in a snapshot,
* (BTD uses few data ranks so this is costly for ADIOS collective functions)
* Instead we aggregate a few buffers before calling ForceFlush(isBTD) to write out.
* Note that SPAN is used to allocate CPU data in ADIOS.
* The advantage is when SPAN is successful, PerformPut takes no action.
*
* isBTD=false => PDW
*/
void WarpXOpenPMDPlot::flushCurrent (bool isBTD) const
{
WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent");

openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);

currIteration.seriesFlush();
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
if (isBTD) {
WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent-PP()");
currIteration.seriesFlush( "adios2.engine.preferred_flush_target = \"buffer\"" );
} else {
WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent()");
currIteration.seriesFlush();
}
}

std::string
Expand Down Expand Up @@ -463,6 +477,7 @@

void WarpXOpenPMDPlot::CloseStep (bool isBTD, bool isLastBTDFlush)
{
WARPX_PROFILE("WarpXOpenPMDPlot::CloseStep()");
// default close is true
bool callClose = true;
// close BTD file only when isLastBTDFlush is true
Expand Down Expand Up @@ -666,19 +681,37 @@
pc->getCharge(), pc->getMass(),
isBTD, isLastBTDFlush);
}
}

/*
* Flush a few BTD buffers in a snapshot
* controlled by FlushFormatOpenPMD::m_NumAggBTDBufferToFlush (default to 5)
* can be adjusted in the input file: <diag>.buffer_flush_limit_btd
*/
void
WarpXOpenPMDPlot::ForceFlush(bool isBTD)
{
if (!isBTD)
return;

auto hasOption = m_OpenPMDoptions.find("FlattenSteps");
const bool flattenSteps = isBTD && (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos);
const bool result = (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos);

if (flattenSteps)
if (result)
{
// forcing new step so data from each btd batch in
// preferred_flush_target="buffer" can be flushed out
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")");
WARPX_PROFILE("WarpXOpenPMDPlot::FlattenSteps()");
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")");
}
else
{
WARPX_PROFILE("WarpXOpenPMDPlot::PDW()");
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "disk")");
}
}


void
WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc,
const std::string& name,
Expand Down Expand Up @@ -1473,6 +1506,7 @@
}
} // icomp setup loop

bool spanWorks = true;
for ( int icomp=0; icomp<ncomp; icomp++ ) {
std::string const & varname = varnames[icomp];

Expand Down Expand Up @@ -1509,12 +1543,22 @@
// GPU pointers to the I/O library
#ifdef AMREX_USE_GPU
if (fab.arena()->isManaged() || fab.arena()->isDevice()) {
amrex::BaseFab<amrex::Real> foo(local_box, 1, amrex::The_Pinned_Arena());
std::shared_ptr<amrex::Real> data_pinned(foo.release());
amrex::Gpu::dtoh_memcpy_async(data_pinned.get(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real));
// intentionally delayed until before we .flush(): amrex::Gpu::streamSynchronize();
mesh_comp.storeChunk(data_pinned, chunk_offset, chunk_size);
} else
{
WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::D2H_Span()");
auto dynamicMemoryView = mesh_comp.storeChunk<amrex::Real>(
Copy link
Member

@ax3l ax3l Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, when we have an un-equal number of blocks over all MPI ranks, this will not work because storeChunk is collective :-o openPMD/openPMD-api#1794

Copy link
Member

@ax3l ax3l Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To double check: is Iteration::open() called before these blocks, so the ADIOS engine is definitely open? -- Yes
https://github.com/guj/WarpX/blob/d0bbf79e64beed82b64d29f85ec0a56ed0f5f087/Source/Diagnostics/WarpXOpenPMD.cpp#L1426-L1430

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, when we have an un-equal number of blocks over all MPI ranks, this will not work because storeChunk is collective :-o openPMD/openPMD-api#1794

Only one rank is writing when using BTD. Other ranks have no data.

chunk_offset, chunk_size,
[&local_box, &spanWorks](size_t size) {
(void) size;
spanWorks = false;
amrex::BaseFab<amrex::Real> foo(local_box, 1, amrex::The_Pinned_Arena());
std::shared_ptr<amrex::Real> data_pinned(foo.release());
return data_pinned;
});

auto span = dynamicMemoryView.currentBuffer();
amrex::Gpu::dtoh_memcpy_async(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real));
}
} else
#endif
{
amrex::Real const *local_data = fab.dataPtr(icomp);
Expand Down
Loading