Skip to content

Commit ed3872e

Browse files
committed
Enqueue remote reads in batches
Ref. the meeting with the Maestro team: The fabric should not be bombarded with too many requests at once. Batch size currently hardcoded as 10.
1 parent 74c73f4 commit ed3872e

File tree

3 files changed

+72
-32
lines changed

3 files changed

+72
-32
lines changed

source/adios2/engine/sst/SstReader.cpp

+50-19
Original file line numberDiff line numberDiff line change
@@ -669,28 +669,46 @@ bool SstReader::VariableMinMax(const VariableBase &Var, const size_t Step, MinMa
669669
return m_BP5Deserializer->VariableMinMax(Var, Step, MinMax);
670670
}
671671

672+
void *SstReader::performDeferredReadRemoteMemory(DeferredReadRemoteMemory const &params)
673+
{
674+
return SstReadRemoteMemory(m_Input, params.rank, CurrentStep(), params.payloadStart,
675+
params.payloadSize, params.buffer, params.dp_info);
676+
}
677+
678+
constexpr static size_t BATCH_SIZE = 10;
679+
672680
void SstReader::BP5PerformGets()
673681
{
674682
size_t maxReadSize;
675683
auto ReadRequests = m_BP5Deserializer->GenerateReadRequests(true, &maxReadSize);
676684
std::vector<void *> sstReadHandlers;
677-
for (const auto &Req : ReadRequests)
685+
686+
auto iterator = ReadRequests.cbegin();
687+
auto end = ReadRequests.cend();
688+
while (iterator != end)
678689
{
679-
void *dp_info = NULL;
680-
if (m_CurrentStepMetaData->DP_TimestepInfo)
690+
sstReadHandlers.clear();
691+
size_t counter = 0;
692+
for (; counter < BATCH_SIZE && iterator != end; ++iterator, ++counter)
681693
{
682-
dp_info = m_CurrentStepMetaData->DP_TimestepInfo[Req.WriterRank];
694+
auto const &Req = *iterator;
695+
696+
void *dp_info = NULL;
697+
if (m_CurrentStepMetaData->DP_TimestepInfo)
698+
{
699+
dp_info = m_CurrentStepMetaData->DP_TimestepInfo[Req.WriterRank];
700+
}
701+
auto ret = SstReadRemoteMemory(m_Input, Req.WriterRank, Req.Timestep, Req.StartOffset,
702+
Req.ReadLength, Req.DestinationAddr, dp_info);
703+
sstReadHandlers.push_back(ret);
683704
}
684-
auto ret = SstReadRemoteMemory(m_Input, Req.WriterRank, Req.Timestep, Req.StartOffset,
685-
Req.ReadLength, Req.DestinationAddr, dp_info);
686-
sstReadHandlers.push_back(ret);
687-
}
688-
for (const auto &i : sstReadHandlers)
689-
{
690-
if (SstWaitForCompletion(m_Input, i) != SstSuccess)
705+
for (const auto &i : sstReadHandlers)
691706
{
692-
helper::Throw<std::runtime_error>("Engine", "SstReader", "BP5PerformGets",
693-
"Writer failed before returning data");
707+
if (SstWaitForCompletion(m_Input, i) != SstSuccess)
708+
{
709+
helper::Throw<std::runtime_error>("Engine", "SstReader", "BP5PerformGets",
710+
"Writer failed before returning data");
711+
}
694712
}
695713
}
696714

@@ -709,7 +727,7 @@ void SstReader::PerformGets()
709727
}
710728
else if (m_WriterMarshalMethod == SstMarshalBP)
711729
{
712-
std::vector<void *> sstReadHandlers;
730+
std::vector<DeferredReadRemoteMemory> sstReadHandlers;
713731
std::vector<std::vector<char>> buffers;
714732
size_t iter = 0;
715733

@@ -738,13 +756,26 @@ void SstReader::PerformGets()
738756
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
739757
#undef declare_type
740758
}
741-
// wait for all SstRead requests to finish
742-
for (const auto &i : sstReadHandlers)
759+
// run read requests in batches and wait for them to finish
760+
auto iterator = sstReadHandlers.cbegin();
761+
auto end = sstReadHandlers.cend();
762+
std::vector<void *> enqueuedHandlers;
763+
enqueuedHandlers.reserve(BATCH_SIZE);
764+
while (iterator != end)
743765
{
744-
if (SstWaitForCompletion(m_Input, i) != SstSuccess)
766+
size_t counter = 0;
767+
enqueuedHandlers.clear();
768+
for (; counter < BATCH_SIZE && iterator != end; ++iterator, ++counter)
745769
{
746-
helper::Throw<std::runtime_error>("Engine", "SstReader", "PerformGets",
747-
"Writer failed before returning data");
770+
enqueuedHandlers.push_back(performDeferredReadRemoteMemory(*iterator));
771+
}
772+
for (const auto &i : enqueuedHandlers)
773+
{
774+
if (SstWaitForCompletion(m_Input, i) != SstSuccess)
775+
{
776+
helper::Throw<std::runtime_error>("Engine", "SstReader", "PerformGets",
777+
"Writer failed before returning data");
778+
}
748779
}
749780
}
750781

source/adios2/engine/sst/SstReader.h

+12-1
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,19 @@ class SstReader : public Engine
5454
bool VariableMinMax(const VariableBase &, const size_t Step, MinMaxStruct &MinMax);
5555

5656
private:
57+
struct DeferredReadRemoteMemory
58+
{
59+
size_t rank;
60+
size_t payloadStart;
61+
size_t payloadSize;
62+
char *buffer;
63+
void *dp_info;
64+
};
65+
void * performDeferredReadRemoteMemory(DeferredReadRemoteMemory const &);
66+
5767
template <class T>
58-
void ReadVariableBlocksRequests(Variable<T> &variable, std::vector<void *> &sstReadHandlers,
68+
void ReadVariableBlocksRequests(Variable<T> &variable,
69+
std::vector<DeferredReadRemoteMemory> &sstReadHandlers,
5970
std::vector<std::vector<char>> &buffers);
6071

6172
template <class T>

source/adios2/engine/sst/SstReader.tcc

+10-12
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include "adios2/helper/adiosFunctions.h" //GetDataType<T>
1717
#include <adios2-perfstubs-interface.h>
18+
#include <initializer_list>
1819

1920
namespace adios2
2021
{
@@ -25,7 +26,7 @@ namespace engine
2526

2627
template <class T>
2728
void SstReader::ReadVariableBlocksRequests(Variable<T> &variable,
28-
std::vector<void *> &sstReadHandlers,
29+
std::vector<DeferredReadRemoteMemory> &sstReadHandlers,
2930
std::vector<std::vector<char>> &buffers)
3031
{
3132
PERFSTUBS_SCOPED_TIMER_FUNC();
@@ -65,9 +66,8 @@ void SstReader::ReadVariableBlocksRequests(Variable<T> &variable,
6566
std::stringstream ss;
6667
ss << "SST Bytes Read from remote rank " << rank;
6768
PERFSTUBS_SAMPLE_COUNTER(ss.str().c_str(), payloadSize);
68-
auto ret = SstReadRemoteMemory(m_Input, rank, CurrentStep(), payloadStart,
69-
payloadSize, buffer, dp_info);
70-
sstReadHandlers.push_back(ret);
69+
sstReadHandlers.push_back(
70+
DeferredReadRemoteMemory{rank, payloadStart, payloadSize, buffer, dp_info});
7171
}
7272
// if remote data buffer is not compressed
7373
else
@@ -87,10 +87,9 @@ void SstReader::ReadVariableBlocksRequests(Variable<T> &variable,
8787
subStreamInfo.IntersectionBox, m_BP3Deserializer->m_IsRowMajor,
8888
elementOffset))
8989
{
90-
auto ret = SstReadRemoteMemory(m_Input, rank, CurrentStep(),
91-
writerBlockStart, writerBlockSize,
92-
blockInfo.Data + elementOffset, dp_info);
93-
sstReadHandlers.push_back(ret);
90+
sstReadHandlers.push_back(DeferredReadRemoteMemory{
91+
rank, writerBlockStart, writerBlockSize,
92+
reinterpret_cast<char *>(blockInfo.Data + elementOffset), dp_info});
9493
}
9594
// if either input or output is not contiguous memory then
9695
// find all contiguous parts.
@@ -99,10 +98,9 @@ void SstReader::ReadVariableBlocksRequests(Variable<T> &variable,
9998
// batch all read requests
10099
buffers.emplace_back();
101100
buffers.back().resize(writerBlockSize);
102-
auto ret =
103-
SstReadRemoteMemory(m_Input, rank, CurrentStep(), writerBlockStart,
104-
writerBlockSize, buffers.back().data(), dp_info);
105-
sstReadHandlers.push_back(ret);
101+
sstReadHandlers.push_back(
102+
DeferredReadRemoteMemory{rank, writerBlockStart, writerBlockSize,
103+
buffers.back().data(), dp_info});
106104
}
107105
}
108106
++threadID;

0 commit comments

Comments
 (0)