Skip to content

Commit 7186526

Browse files
committed
[df] Add another test for RDF and explicit multithreading
For the following scenario: * Multiple threads, multiple different RDF computation graphs (one per thread). * Some threads run a valid, working computation graph, others have some problems. * Two classes of problems are used. The first is when the declaration of some code to JIT incurs in an error in the interpreter. The second is when there is an exception at runtime. * All problems are hidden explicitly by a try-catch scope. * Threads running a working computation graph should not be impacted by the threads that are running a flawed computation graph.
1 parent b6f0157 commit 7186526

File tree

1 file changed

+81
-0
lines changed

1 file changed

+81
-0
lines changed

Diff for: tree/dataframe/test/dataframe_concurrency.cxx

+81
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include <ROOT/RDF/RInterface.hxx>
44
#include <ROOT/TThreadExecutor.hxx>
55
#include <atomic>
6+
#include <condition_variable>
7+
#include <mutex>
68
#include <numeric>
79
#include <string>
810
#include <thread>
@@ -274,3 +276,82 @@ TEST(RDFConcurrency, JITWithManyThreads)
274276

275277
expect_vec_eq(results, expected);
276278
}
279+
280+
TEST(RDFConcurrency, JITManyThreadsAndExceptions)
281+
{
282+
ROOT::EnableThreadSafety();
283+
284+
std::condition_variable cv_1;
285+
std::condition_variable cv_2;
286+
std::condition_variable cv_3;
287+
std::mutex m_1;
288+
std::mutex m_2;
289+
std::mutex m_3;
290+
bool ready_1{false};
291+
bool ready_2{false};
292+
bool ready_3{false};
293+
294+
int res_work_2{};
295+
int res_work_3{};
296+
297+
auto work_1 = [&]() {
298+
std::lock_guard lk_1{m_1};
299+
try {
300+
ROOT::RDataFrame df{1};
301+
auto df1 = df.Define("work_1_x", "throw std::runtime_error(\"Error in RDF!\"); return 42;");
302+
auto v = df1.Sum<int>("work_1_x").GetValue();
303+
} catch (const std::runtime_error &) {
304+
}
305+
ready_1 = true;
306+
cv_1.notify_all();
307+
};
308+
309+
auto work_2 = [&]() {
310+
std::unique_lock lk_1(m_1);
311+
std::lock_guard lk_2{m_2};
312+
std::unique_lock lk_3{m_3};
313+
ROOT::RDataFrame df{1};
314+
auto df1 = df.Define("work_2_x", "42");
315+
cv_1.wait(lk_1, [&ready_1] { return ready_1; });
316+
auto df2 = df1.Define("work_2_y", "58");
317+
cv_3.wait(lk_3, [&ready_3] { return ready_3; });
318+
auto df3 = df2.Define("work_2_z", "work_2_x + work_2_y");
319+
res_work_2 = df3.Sum<int>("work_2_z").GetValue();
320+
ready_2 = true;
321+
cv_2.notify_one();
322+
};
323+
324+
auto work_3 = [&]() {
325+
std::unique_lock lk_1(m_1);
326+
std::unique_lock lk_2(m_2);
327+
std::unique_lock lk_3(m_3);
328+
ROOT::RDataFrame df{1};
329+
auto df1 = df.Define("work_3_x", "11");
330+
auto df2 = df1.Define("work_3_y", "work_3_x * 2");
331+
cv_1.wait(lk_1, [&ready_1] { return ready_1; });
332+
cv_2.wait(lk_2, [&ready_2] { return ready_2; });
333+
auto df3 = df2.Define("work_3_z", "work_3_y + work_3_x");
334+
cv_3.wait(lk_3, [&ready_3] { return ready_3; });
335+
auto df4 = df3.Define("work_3_fin", "work_3_x + work_3_y + work_3_z");
336+
res_work_3 = df4.Sum<int>("work_3_fin").GetValue();
337+
};
338+
339+
auto work_4 = [&]() {
340+
std::lock_guard lk_3{m_3};
341+
try {
342+
ROOT::RDataFrame df{1};
343+
auto df1 = df.Define("x", "rndm");
344+
} catch (const std::runtime_error &) {
345+
}
346+
ready_3 = true;
347+
cv_3.notify_all();
348+
};
349+
350+
std::array<std::thread, 4> threads{std::thread{work_1}, std::thread{work_2}, std::thread{work_3},
351+
std::thread{work_4}};
352+
for (auto &&t : threads)
353+
t.join();
354+
355+
EXPECT_EQ(res_work_2, 100);
356+
EXPECT_EQ(res_work_3, 66);
357+
}

0 commit comments

Comments
 (0)