Skip to content

Commit dcf18b2

Browse files
committed
Add Serializer to enable serializing tasks that must be called in order
Example use-case for which we need it: in parallel I/O context, calls to H5Open for the same file must be ordered / serialized. All processes must call H5Open in the same order.
1 parent a8f1b8f commit dcf18b2

File tree

3 files changed

+242
-0
lines changed

3 files changed

+242
-0
lines changed
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
#pragma once
2+
#include "lue/framework/core/assert.hpp"
3+
#include <hpx/future.hpp>
4+
#include <boost/container/flat_map.hpp>
5+
#include <concepts>
6+
7+
8+
namespace lue {
9+
10+
/*!
11+
@brief Class for maintaining information that can be used to serialize concurrent tasks
12+
@tparam Key Type for objects to group information by. In case of serializing access to a file,
13+
this could be the type used to represent the name of the file, for example.
14+
@tparam Generation Type to represent the generation / order / count
15+
@warning This class is not thread-safe: call its member functions from a single thread
16+
@warning There is no facility yet to allow instances of this class to shrink again
17+
18+
An example use-case for this class is performing parallel I/O to an HDF5 file. When opening the same
19+
file multiple times, the collective open calls must be serialized. An instance of this class can be
20+
used to achieve this:
21+
22+
- On the root locality, each call to the function that will result in the call to H5Open is associated
23+
with a count, starting with 1
24+
- All tasks that will result in the call to H5Open on the localities are passed in the count. Note
25+
that a task for a higher count can potentially be scheduled to run before a task for a lower count.
26+
This is the problem that needs to be prevented.
27+
- As long as an open call associated with a count before the current one has not finished yet, a task
28+
must not try to open this file. This can be achieved by attaching a continuation to the future
29+
associated with the open call that must finish first.
30+
31+
In code:
32+
33+
@code{.cpp}
34+
Serializer<std::string, Count> open_file_serializer{};
35+
36+
void my_task(std::string const& pathname, Count const count)
37+
{
38+
// The promise is related to us / the current count
39+
hpx::promise<void> promise = open_file_serializer.promise_for(pathname, count);
40+
41+
// The future is related to the one before us, with count count - 1
42+
hpx::future<void> predecessor_future = open_file_serializer.when_predecessor_done(
43+
pathname, count);
44+
45+
hpx::future<Dataset> a_future = predecessor_future.then(
46+
[](hpx::future<void> const& future)
47+
{
48+
// Call H5Open
49+
// ...
50+
51+
// This will allow the next in line to call H5Open
52+
promise.set_value();
53+
54+
// Return open dataset?
55+
// ...
56+
}
57+
);
58+
59+
// a_future will become ready once the call to H5Open has finished
60+
}
61+
@endcode
62+
*/
63+
template<std::equality_comparable Key, std::totally_ordered Generation>
64+
class Serializer
65+
{
66+
67+
public:
68+
69+
/*!
70+
@brief Request a promise associated by a future for the @a key and @a generation passed
71+
in
72+
@warning generation must be larger than zero
73+
74+
The promise returned is related to the future which is related to the @a generation passed in.
75+
It can only be obtained once. Calling this function multiple times for the same generation
76+
will result in promises that are in a valid but unspecified state (they are useless).
77+
78+
It is fine if this function is called for future generations first. That is the point of this
79+
class. It allows to serialize code in a context where calls can not easily be serialized.
80+
81+
The caller is responsible for setting the value of the promise (call set_value()
82+
on it). Otherwise none of the tasks associated with a higher generation will ever be
83+
scheduled.
84+
*/
85+
auto promise_for([[maybe_unused]] Key const& key, [[maybe_unused]] Generation const generation)
86+
-> hpx::promise<void>
87+
{
88+
lue_hpx_assert(generation > 0);
89+
90+
// Map will be created if not present already
91+
auto& map{_tuples[key]};
92+
93+
if (!map.contains(generation))
94+
{
95+
// Function to add (promise, future) tuples by generation
96+
auto add_tuple = [&map](Generation const generation) -> void
97+
{
98+
hpx::promise<void> promise{};
99+
hpx::future<void> future{promise.get_future()};
100+
map[generation] = std::make_tuple(std::move(promise), std::move(future));
101+
};
102+
103+
// If the map is empty, we add a first (promise, future) tuple for a first generation (0).
104+
// The promise's value is set immediately so the future is already ready. This way, we can
105+
// return a future in when_predecessor_done for the first generation (1).
106+
if (map.empty())
107+
{
108+
add_tuple(0);
109+
hpx::promise<void>& promise = std::get<0>(map[0]);
110+
lue_hpx_assert(!std::get<1>(map[0]).is_ready());
111+
promise.set_value();
112+
lue_hpx_assert(std::get<1>(map[0]).is_ready());
113+
}
114+
115+
// Add (promise, future) tuples for the current generation passed in and any generations
116+
// missing between the last one added until the current one
117+
for (Generation new_order = map.nth(map.size() - 1)->first + 1; new_order <= generation;
118+
++new_order)
119+
{
120+
add_tuple(new_order);
121+
}
122+
}
123+
124+
lue_hpx_assert(map.contains(generation));
125+
lue_hpx_assert(map.size() > 1);
126+
127+
hpx::promise<void>& promise = std::get<0>(map[generation]);
128+
129+
return std::move(promise);
130+
}
131+
132+
133+
/*!
134+
@brief Return the future associated with the **predecessor** call for the @a key and
135+
@a generation passed in
136+
137+
Attach a continuation to the future returned to serialize access to some resource.
138+
139+
This function can only be called once for a @a key and @a generation. The future returned is
140+
the only one. Subsequent calls will return a future that is in a valid but unspecified state
141+
(it is useless).
142+
*/
143+
auto when_predecessor_done(
144+
[[maybe_unused]] Key const& key, [[maybe_unused]] Generation const generation)
145+
-> hpx::future<void>
146+
{
147+
lue_hpx_assert(_tuples.contains(key));
148+
auto& map{_tuples[key]};
149+
lue_hpx_assert(map.contains(generation));
150+
lue_hpx_assert(map.size() > 1);
151+
152+
hpx::future<void>& future = std::get<1>(map[generation - 1]);
153+
lue_hpx_assert(future.valid());
154+
155+
return std::move(future);
156+
}
157+
158+
private:
159+
160+
using Promise = hpx::promise<void>;
161+
162+
using Future = hpx::future<void>;
163+
164+
using FutureTuple = std::tuple<Promise, Future>;
165+
166+
using TupleByGeneration = boost::container::flat_map<Generation, FutureTuple>;
167+
168+
using TupleByGenerationByKey = std::map<Key, TupleByGeneration>;
169+
170+
TupleByGenerationByKey _tuples;
171+
};
172+
173+
} // namespace lue

source/framework/io/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ set(scope lue_framework_io)
22
set(names
33
gdal
44
lue
5+
serializer
56
)
67

78
foreach(name ${names})
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#define BOOST_TEST_MODULE lue framework io serializer
2+
#include "lue/framework/io/serializer.hpp"
3+
#include "lue/framework/test/hpx_unit_test.hpp"
4+
#include <hpx/algorithm.hpp>
5+
#include <random>
6+
7+
8+
BOOST_AUTO_TEST_CASE(variable_raster)
9+
{
10+
// Create a number of tasks that need to be run in the order of their task ID, not the order in which they
11+
// are created or some other order.
12+
13+
using Key = std::string;
14+
using TaskID = std::int32_t;
15+
16+
lue::Serializer<Key, TaskID> task_serializer{};
17+
18+
Key const key = "my_tasks";
19+
TaskID const min_task_id = 1;
20+
TaskID const max_task_id = 100;
21+
22+
std::random_device random_device{};
23+
std::mt19937 random_number_engine{random_device()};
24+
25+
// Create collection of randomly ordered task IDs
26+
std::vector<TaskID> input_task_ids(max_task_id);
27+
std::ranges::iota(input_task_ids, min_task_id);
28+
std::ranges::shuffle(input_task_ids, random_number_engine);
29+
30+
// Each task writes its ID into this collection. If all goes well, all task IDs should end up here,
31+
// ordered from min_task_id to max_task_id.
32+
std::vector<TaskID> output_task_ids{};
33+
34+
// Collection of futures to wait on before we can start testing
35+
std::vector<hpx::future<void>> futures{};
36+
37+
for (TaskID task_id : input_task_ids)
38+
{
39+
// This is how we can mark that we are done accessing some resource
40+
hpx::promise<void> promise = task_serializer.promise_for(key, task_id);
41+
42+
// Create a task that will run after the one with task ID equal to task_id - 1 has finished
43+
futures.push_back(task_serializer.when_predecessor_done(key, task_id)
44+
.then(
45+
[&output_task_ids, task_id, promise = std::move(promise)](
46+
hpx::future<void> const& future) mutable -> auto
47+
{
48+
BOOST_REQUIRE(future.valid());
49+
BOOST_REQUIRE(future.is_ready());
50+
51+
// All threads access this same collection, but since these calls are
52+
// serialized, these accesses won't happen at the same time
53+
output_task_ids.push_back(task_id);
54+
55+
// We are done, next in line can do its thing
56+
promise.set_value();
57+
}));
58+
}
59+
60+
hpx::wait_all(futures.begin(), futures.end());
61+
62+
BOOST_REQUIRE(!output_task_ids.empty());
63+
BOOST_CHECK_EQUAL(output_task_ids.size(), max_task_id - min_task_id + 1);
64+
BOOST_CHECK(std::ranges::is_sorted(output_task_ids));
65+
BOOST_CHECK(std::ranges::adjacent_find(output_task_ids) == output_task_ids.end());
66+
BOOST_CHECK_EQUAL(output_task_ids.front(), 1);
67+
BOOST_CHECK_EQUAL(output_task_ids.back(), max_task_id);
68+
}

0 commit comments

Comments
 (0)