Skip to content

Commit 66af2b4

Browse files
ax3lfranzpoeschel
andauthored
Iteration::open() (#862)
* Test: non-collective, parallel read * Iteration::open() * Enforce strict opening of files in ADIOS2 * use general flush implementation * file_based_write_read: Skip ADIOS1 * Skip ADIOS1 coverage for now, it's blocking the release. Co-authored-by: Franz Pöschel <[email protected]>
1 parent d3041a7 commit 66af2b4

File tree

6 files changed

+137
-1
lines changed

6 files changed

+137
-1
lines changed

docs/source/details/mpi.rst

+3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Functionality Behavior Description
1919
``Series`` **collective** open and close
2020
``::flush()`` **collective** read and write
2121
``Iteration`` [1]_ independent declare and open
22+
``::open()`` [3]_ **collective** explicit open
2223
``Mesh`` [1]_ independent declare, open, write
2324
``ParticleSpecies`` [1]_ independent declare, open, write
2425
``::setAttribute`` [2]_ *backend-specific* declare, write
@@ -33,6 +34,8 @@ Functionality Behavior Description
3334
.. [2] :ref:`HDF5 <backends-hdf5>` only supports collective attribute definitions/writes; :ref:`ADIOS1 <backends-adios1>` and :ref:`ADIOS2 <backends-adios2>` attributes can be written independently.
3435
If you want to support all backends equally, treat as a collective operation.
3536
37+
.. [3] We usually open iterations delayed on first access. This first access is usually the ``flush()`` call after a ``storeChunk``/``loadChunk`` operation. If the first access is non-collective, an explicit, collective ``Iteration::open()`` can be used to have the files already open.
38+
3639
.. tip::
3740

3841
Just because an operation is independent does not mean it is allowed to be inconsistent.

include/openPMD/Iteration.hpp

+13
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,19 @@ class Iteration : public Attributable
110110
Iteration &
111111
close( bool flush = true );
112112

113+
/** Open an iteration
114+
*
115+
* Explicitly open an iteration.
116+
* Usually, file-open operations are delayed until the first load/storeChunk
117+
* operation is flush-ed. In parallel contexts where it is know that such a
118+
* first access needs to be run non-collectively, one can explicitly open
119+
* an iteration through this collective call.
120+
*
121+
* @return Reference to iteration.
122+
*/
123+
Iteration &
124+
open();
125+
113126
/**
114127
* @brief Has the iteration been closed?
115128
* A closed iteration may not (yet) be reopened.

src/IO/ADIOS/ADIOS2IOHandler.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,10 @@ void ADIOS2IOHandlerImpl::openFile(
417417

418418
writable->written = true;
419419
writable->abstractFilePosition = std::make_shared< ADIOS2FilePosition >( );
420+
421+
// enforce opening the file
422+
// lazy opening is deathly in parallel situations
423+
getFileData( file );
420424
}
421425

422426
void

src/Iteration.cpp

+18
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "openPMD/Datatype.hpp"
2424
#include "openPMD/Series.hpp"
2525
#include "openPMD/auxiliary/DerefDynamicCast.hpp"
26+
#include "openPMD/auxiliary/Filesystem.hpp"
2627
#include "openPMD/auxiliary/StringManip.hpp"
2728
#include "openPMD/backend/Writable.hpp"
2829

@@ -174,6 +175,23 @@ Iteration::close( bool _flush )
174175
return *this;
175176
}
176177

178+
Iteration &
179+
Iteration::open()
180+
{
181+
Series * s = &auxiliary::deref_dynamic_cast< Series >(
182+
parent->attributable->parent->attributable );
183+
// figure out my iteration number
184+
auto begin = s->indexOf( *this );
185+
auto end = begin;
186+
++end;
187+
// set dirty, so Series::flush will open the file
188+
this->dirty() = true;
189+
s->flush_impl( begin, end );
190+
this->dirty() = false;
191+
192+
return *this;
193+
}
194+
177195
bool
178196
Iteration::closed() const
179197
{

src/binding/python/Iteration.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ void init_Iteration(py::module &m) {
4646
.def_property("dt", &Iteration::dt<double>, &Iteration::setDt<double>)
4747
.def_property("dt", &Iteration::dt<long double>, &Iteration::setDt<long double>)
4848
.def_property("time_unit_SI", &Iteration::timeUnitSI, &Iteration::setTimeUnitSI)
49+
.def("open", &Iteration::open)
4950
.def("close", &Iteration::close, py::arg("flush") = true)
5051

5152
// TODO remove in future versions (deprecated)

test/ParallelIOTest.cpp

+98-1
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,103 @@ TEST_CASE( "close_iteration_test", "[parallel]" )
547547
}
548548
}
549549

550+
void
551+
file_based_write_read( std::string file_ending )
552+
{
553+
namespace io = openPMD;
554+
555+
// the iterations we want to write
556+
std::vector< int > iterations = { 10, 30, 50, 70 };
557+
558+
// MPI communicator meta-data and file name
559+
int i_mpi_rank{ -1 }, i_mpi_size{ -1 };
560+
MPI_Comm_rank( MPI_COMM_WORLD, &i_mpi_rank );
561+
MPI_Comm_size( MPI_COMM_WORLD, &i_mpi_size );
562+
unsigned mpi_rank{ static_cast< unsigned >( i_mpi_rank ) },
563+
mpi_size{ static_cast< unsigned >( i_mpi_size ) };
564+
std::string name = "../samples/file_based_write_read_%05T." + file_ending;
565+
566+
// data (we just use the same data for each step for demonstration)
567+
// we assign 10 longitudinal cells & 300 transversal cells per rank here
568+
unsigned const local_Nz = 10u;
569+
unsigned const global_Nz = local_Nz * mpi_size;
570+
unsigned const global_Nx = 300u;
571+
using precision = double;
572+
std::vector< precision > E_x_data( global_Nx * local_Nz );
573+
// filling some values: 0, 1, ...
574+
std::iota( E_x_data.begin(), E_x_data.end(), local_Nz * mpi_rank);
575+
std::transform(E_x_data.begin(), E_x_data.end(), E_x_data.begin(),
576+
[](precision d) -> precision { return std::sin( d * 2.0 * 3.1415 / 20. ); });
577+
578+
{
579+
// open a parallel series
580+
Series series(name, Access::CREATE, MPI_COMM_WORLD);
581+
series.setIterationEncoding(IterationEncoding::fileBased);
582+
583+
int const last_step = 100;
584+
for (int step = 0; step < last_step; ++step) {
585+
MPI_Barrier(MPI_COMM_WORLD);
586+
587+
// is this an output step?
588+
bool const rank_in_output_step =
589+
std::find(iterations.begin(), iterations.end(), step) != iterations.end();
590+
if (!rank_in_output_step) continue;
591+
592+
// now we write (parallel, independent I/O)
593+
auto it = series.iterations[step];
594+
auto E = it.meshes["E"]; // record
595+
auto E_x = E["x"]; // record component
596+
597+
// some meta-data
598+
E.setAxisLabels({"z", "x"});
599+
E.setGridSpacing<double>({1.0, 1.0});
600+
E.setGridGlobalOffset({0.0, 0.0});
601+
E_x.setPosition<double>({0.0, 0.0});
602+
603+
// update values
604+
std::iota(E_x_data.begin(), E_x_data.end(), local_Nz * mpi_rank);
605+
std::transform(E_x_data.begin(), E_x_data.end(), E_x_data.begin(),
606+
[&step](precision d) -> precision {
607+
return std::sin(d * 2.0 * 3.1415 / 100. + step);
608+
});
609+
610+
auto dataset = io::Dataset(
611+
io::determineDatatype<precision>(),
612+
{global_Nx, global_Nz});
613+
E_x.resetDataset(dataset);
614+
615+
Offset chunk_offset = {0, local_Nz * mpi_rank};
616+
Extent chunk_extent = {global_Nx, local_Nz};
617+
E_x.storeChunk(
618+
io::shareRaw(E_x_data),
619+
chunk_offset, chunk_extent);
620+
series.flush();
621+
}
622+
}
623+
624+
// check non-collective, parallel read
625+
{
626+
Series read( name, Access::READ_ONLY, MPI_COMM_WORLD );
627+
Iteration it = read.iterations[ 30 ];
628+
it.open(); // collective
629+
bool isAdios1 = read.backend() == "MPI_ADIOS1"; // FIXME: this is an ADIOS1 backend bug
630+
if( mpi_rank == 0 || isAdios1 ) // non-collective branch (unless ADIOS1)
631+
{
632+
auto E_x = it.meshes["E"]["x"];
633+
auto data = E_x.loadChunk< double >();
634+
read.flush();
635+
}
636+
}
637+
}
638+
639+
TEST_CASE( "file_based_write_read", "[parallel]" )
640+
{
641+
for( auto const & t : getBackends() )
642+
{
643+
file_based_write_read( t );
644+
}
645+
}
646+
550647
void
551648
hipace_like_write( std::string file_ending )
552649
{
@@ -772,4 +869,4 @@ TEST_CASE( "adios2_streaming", "[pseudoserial][adios2]" )
772869
{
773870
adios2_streaming();
774871
}
775-
#endif
872+
#endif

0 commit comments

Comments
 (0)