Skip to content

Commit 5f8b49c

Browse files
committed
map_reduce: prevent mapper or reducer exception from poisoning state
In map_reduce, if the mapper throws, we can end up with poisoned state. This is because the compiler can choose to move s->result before evaluating f.get(), so s->result gets broken (if the mapped type becomes invalid after move). Fix by checking for exceptions from the mapper before calling the reducer, and exceptions from the reducer after calling the reducer, and storing them in the state. If we see an exception, don't bother calling the reducer on failed state. Since the reducer is a pure function (at least in expected usage), and wouldn't have been called on mapper exceptions, we don't lose anything by calling it. A unit test is added for mapper exceptions and for reducer exceptions. The tests do fail with clang before the patch, though if the arguments are evaluated in reverse order, it can pass with a different compiler).
1 parent 0dfee8b commit 5f8b49c

File tree

2 files changed

+70
-9
lines changed

2 files changed

+70
-9
lines changed

include/seastar/core/map_reduce.hh

+16-9
Original file line numberDiff line numberDiff line change
@@ -185,23 +185,30 @@ map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduc
185185
Mapper mapper;
186186
Initial result;
187187
Reduce reduce;
188+
std::exception_ptr ex;
188189
};
189-
auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::move(initial), std::move(reduce)});
190+
auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::move(initial), std::move(reduce), std::exception_ptr()});
190191
future<> ret = make_ready_future<>();
191192
while (begin != end) {
192193
ret = futurize_invoke(s->mapper, *begin++).then_wrapped([s = s.get(), ret = std::move(ret)] (auto f) mutable {
193-
try {
194-
s->result = s->reduce(std::move(s->result), f.get());
195-
return std::move(ret);
196-
} catch (...) {
197-
return std::move(ret).then_wrapped([ex = std::current_exception()] (auto f) {
198-
f.ignore_ready_future();
199-
return make_exception_future<>(ex);
200-
});
194+
if (s->ex) {
195+
f.ignore_ready_future(); // We only report the first exception
196+
} else if (f.failed()) {
197+
s->ex = f.get_exception();
198+
} else {
199+
try {
200+
s->result = s->reduce(std::move(s->result), f.get());
201+
} catch (...) {
202+
s->ex = std::current_exception();
203+
}
201204
}
205+
return std::move(ret);
202206
});
203207
}
204208
return ret.then([s] {
209+
if (s->ex) {
210+
return make_exception_future<Initial>(std::move(s->ex));
211+
}
205212
return make_ready_future<Initial>(std::move(s->result));
206213
});
207214
}

tests/unit/futures_test.cc

+54
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,60 @@ SEASTAR_TEST_CASE(test_map_reduce1_lifetime) {
862862
});
863863
}
864864

865+
SEASTAR_TEST_CASE(map_reduce_with_throwing_mapper) {
866+
try {
867+
auto vec = std::vector<int>{1, 2, 3, 4, 5, 6, 7};
868+
auto ret = co_await map_reduce(
869+
vec,
870+
// Mapper: identity function, but throws
871+
[] (int x) -> future<int> {
872+
if (x == 5) {
873+
throw std::runtime_error("test");
874+
}
875+
co_return x;
876+
},
877+
// Initial value (and accumulator): move-only type
878+
std::make_unique<int>(0),
879+
// Reducer: test that it does not act on a moved-from value
880+
[] (std::unique_ptr<int> acc, int x) -> std::unique_ptr<int> {
881+
BOOST_REQUIRE(bool(acc));
882+
*acc += x;
883+
return acc;
884+
}
885+
);
886+
BOOST_FAIL("should have thrown");
887+
} catch (...) {
888+
// Exception is expected and uninteresting
889+
}
890+
}
891+
892+
SEASTAR_TEST_CASE(map_reduce_with_throwing_reducer) {
893+
try {
894+
auto vec = std::vector<int>{1, 2, 3, 4, 5, 6, 7};
895+
auto ret = co_await map_reduce(
896+
vec,
897+
// Mapper: square function
898+
[] (int x) -> future<int> {
899+
co_return x * x;
900+
},
901+
// Initial value (and accumulator): move-only type
902+
std::make_unique<int>(0),
903+
// Reducer: simple sum, but randomly throws
904+
[] (std::unique_ptr<int> acc, int x) -> std::unique_ptr<int> {
905+
BOOST_REQUIRE(bool(acc));
906+
if (*acc > 14) {
907+
throw std::runtime_error("accumulator overflow, launch missiles");
908+
}
909+
*acc += x;
910+
return acc;
911+
}
912+
);
913+
BOOST_FAIL("should have thrown");
914+
} catch (...) {
915+
// Exception is expected and uninteresting
916+
}
917+
}
918+
865919
// This test doesn't actually test anything - it just waits for the future
866920
// returned by sleep to complete. However, a bug we had in sleep() caused
867921
// this test to fail the sanitizer in the debug build, so this is a useful

0 commit comments

Comments
 (0)