-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: numa_partitioner for parallel_for. #1461
base: master
Are you sure you want to change the base?
Changes from 14 commits
aa684b6
59253c9
3d1174a
2209ab3
238c892
6edea84
0a01596
6c68575
0407953
1e1d7a1
200675b
52fd000
5ebaf7e
2172632
a8de090
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -179,6 +179,33 @@ task* start_for<Range, Body, Partitioner>::cancel(execution_data& ed) { | |
return nullptr; | ||
} | ||
|
||
template<typename BasePartitioner> | ||
template<typename Range, typename Body> | ||
void numa_partitioner<BasePartitioner>::execute_for(const Range& range, const Body& body) const{ | ||
if (range.is_divisible() && num_numa_nodes > 1) { | ||
std::vector<Range> subranges; | ||
split_range(range, subranges, num_numa_nodes); | ||
std::vector<oneapi::tbb::task_group> task_groups(num_numa_nodes); | ||
initialize_arena(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the task_arenas be initialized once instead of during each parallel_for execution. I would expect that a partitioner like this could be created and then passed to a number of parallel_fors, amortizing the initialization cost. |
||
|
||
for (std::size_t i = 0; i < num_numa_nodes; ++i) { | ||
arenas[i].execute([&]() { | ||
task_groups[i].run([&, i] { | ||
parallel_for(subranges[i], body, base_partitioner); | ||
}); | ||
}); | ||
} | ||
for (std::size_t i = 0; i < num_numa_nodes; ++i) { | ||
arenas[i].execute([&task_groups, i]() { | ||
task_groups[i].wait(); | ||
}); | ||
} | ||
} | ||
else { | ||
parallel_for(range,body,base_partitioner); | ||
} | ||
} | ||
|
||
//! Calls the function with values from range [begin, end) with a step provided | ||
template<typename Function, typename Index> | ||
class parallel_for_body_wrapper : detail::no_assign { | ||
|
@@ -261,6 +288,15 @@ void parallel_for( const Range& range, const Body& body, affinity_partitioner& p | |
start_for<Range,Body,affinity_partitioner>::run(range,body,partitioner); | ||
} | ||
|
||
//! Parallel iteration over range with numa_partitioner. | ||
/** @ingroup algorithms **/ | ||
template<typename Range, typename Body, typename T> | ||
__TBB_requires(tbb_range<Range> && parallel_for_body<Body, Range>) | ||
void parallel_for(const Range& range, const Body& body, numa_partitioner<T>& n_partitioner) { | ||
n_partitioner.execute_for(range, body); | ||
} | ||
|
||
|
||
//! Parallel iteration over range with default partitioner and user-supplied context. | ||
/** @ingroup algorithms **/ | ||
template<typename Range, typename Body> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,9 @@ | |
#include <algorithm> | ||
#include <atomic> | ||
#include <type_traits> | ||
#include <tbb/blocked_range.h> | ||
#include <tbb/blocked_range2d.h> | ||
#include <tbb/blocked_range3d.h> | ||
|
||
#if defined(_MSC_VER) && !defined(__INTEL_COMPILER) | ||
// Workaround for overzealous compiler warnings | ||
|
@@ -67,7 +70,8 @@ class static_partitioner; | |
class affinity_partitioner; | ||
class affinity_partition_type; | ||
class affinity_partitioner_base; | ||
|
||
template <typename T> class numa_partitioner; | ||
|
||
inline std::size_t get_initial_auto_partitioner_divisor() { | ||
const std::size_t factor = 4; | ||
return factor * static_cast<std::size_t>(max_concurrency()); | ||
|
@@ -567,14 +571,15 @@ class affinity_partition_type : public dynamic_grainsize_mode<linear_affinity_mo | |
@ingroup algorithms */ | ||
class simple_partitioner { | ||
public: | ||
simple_partitioner() {} | ||
simple_partitioner() {} | ||
// new implementation just extends existing interface | ||
typedef simple_partition_type task_partition_type; | ||
|
||
private: | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_for; | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_reduce; | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_deterministic_reduce; | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_scan; | ||
// new implementation just extends existing interface | ||
typedef simple_partition_type task_partition_type; | ||
// TODO: consider to make split_type public | ||
typedef simple_partition_type::split_type split_type; | ||
|
||
|
@@ -594,14 +599,14 @@ class simple_partitioner { | |
class auto_partitioner { | ||
public: | ||
auto_partitioner() {} | ||
// new implementation just extends existing interface | ||
typedef auto_partition_type task_partition_type; | ||
|
||
private: | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_for; | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_reduce; | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_deterministic_reduce; | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_scan; | ||
// new implementation just extends existing interface | ||
typedef auto_partition_type task_partition_type; | ||
// TODO: consider to make split_type public | ||
typedef auto_partition_type::split_type split_type; | ||
|
||
|
@@ -627,13 +632,15 @@ class auto_partitioner { | |
class static_partitioner { | ||
public: | ||
static_partitioner() {} | ||
// new implementation just extends existing interface | ||
typedef static_partition_type task_partition_type; | ||
|
||
private: | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_for; | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_reduce; | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_deterministic_reduce; | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_scan; | ||
// new implementation just extends existing interface | ||
typedef static_partition_type task_partition_type; | ||
|
||
// TODO: consider to make split_type public | ||
typedef static_partition_type::split_type split_type; | ||
}; | ||
|
@@ -642,18 +649,63 @@ class static_partitioner { | |
class affinity_partitioner : affinity_partitioner_base { | ||
public: | ||
affinity_partitioner() {} | ||
// new implementation just extends existing interface | ||
typedef affinity_partition_type task_partition_type; | ||
|
||
private: | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_for; | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_reduce; | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_deterministic_reduce; | ||
template<typename Range, typename Body, typename Partitioner> friend struct start_scan; | ||
// new implementation just extends existing interface | ||
typedef affinity_partition_type task_partition_type; | ||
|
||
// TODO: consider to make split_type public | ||
typedef affinity_partition_type::split_type split_type; | ||
}; | ||
|
||
template<typename BasePartitioner> | ||
class numa_partitioner { | ||
std::size_t num_numa_nodes; | ||
BasePartitioner& base_partitioner; | ||
|
||
public: | ||
numa_partitioner() : num_numa_nodes(get_number_of_numa_nodes()), base_partitioner() {} | ||
numa_partitioner(BasePartitioner& bp) : num_numa_nodes(get_number_of_numa_nodes()), base_partitioner(bp) {} | ||
|
||
void initialize_arena() const { | ||
for (std::size_t node = 0; node < num_numa_nodes; ++node) { | ||
this->arenas.emplace_back(tbb::task_arena::constraints().set_numa_id(node)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the same instance is used across multiple parallel_fors, won't the arenas vector keep growing? I think initialize would be repeatedly invoked. |
||
} | ||
} | ||
typedef detail::proportional_split split_type; | ||
typedef typename BasePartitioner::task_partition_type task_partition_type; | ||
|
||
template<typename Range, typename Body> | ||
void execute_for(const Range& range, const Body& body) const; | ||
|
||
template<typename Range, typename Body> | ||
void execute_reduce(const Range& range, Body& body) const; | ||
|
||
private: | ||
mutable std::vector<oneapi::tbb::task_arena> arenas; | ||
|
||
// Function to get the number of NUMA nodes in the system | ||
std::size_t get_number_of_numa_nodes() { | ||
return oneapi::tbb::info::numa_nodes().size(); | ||
} | ||
|
||
// Helper function to split a range into multiple subranges | ||
template<typename Range> | ||
void split_range(const Range& range, std::vector<Range>& subranges, std::size_t num_parts) const { | ||
subranges.push_back(range); // Start with the full range | ||
for (std::size_t i = 1; i < num_parts; ++i) { | ||
if (!subranges.back().is_divisible()) break; // If the range is no longer divisible, stop splitting | ||
Range new_range = subranges.back(); // Copy the last range | ||
subranges.back()= Range(new_range, detail::split()); | ||
subranges.push_back(new_range); // Add the new subrange | ||
} | ||
} | ||
}; | ||
|
||
} // namespace d1 | ||
} // namespace detail | ||
|
||
|
@@ -663,6 +715,7 @@ using detail::d1::auto_partitioner; | |
using detail::d1::simple_partitioner; | ||
using detail::d1::static_partitioner; | ||
using detail::d1::affinity_partitioner; | ||
using detail::d1::numa_partitioner; | ||
// Split types | ||
using detail::split; | ||
using detail::proportional_split; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -462,3 +462,60 @@ TEST_CASE("parallel_for constraints") { | |
#if _MSC_VER | ||
#pragma warning (pop) | ||
#endif | ||
|
||
// Define a simple functor to use with parallel_for | ||
struct SimpleFunctor { | ||
void operator()(const tbb::blocked_range<size_t>& r) const { | ||
for (size_t i = r.begin(); i != r.end(); ++i) { | ||
// Perform some operation on the range elements | ||
// For simplicity, we'll just ensure each element is visited | ||
} | ||
} | ||
}; | ||
|
||
void TestNumaPartitionerSimple() { | ||
const size_t N = 1000; | ||
std::vector<int> vec(N, 1); | ||
|
||
tbb::blocked_range<size_t> range(0, N); | ||
SimpleFunctor functor; | ||
tbb::affinity_partitioner ap; | ||
tbb::numa_partitioner<tbb::affinity_partitioner> n_partitioner(ap); | ||
|
||
// Test parallel_for with numa_partitioner | ||
parallel_for(range, functor, n_partitioner); | ||
|
||
// Verify results (for now, just check if the function runs without errors) | ||
CHECK(true); | ||
} | ||
|
||
void TestNumaPartitionerWithBody() { | ||
const size_t N = 1000; | ||
std::vector<int> vec(N, 0); | ||
|
||
tbb::blocked_range<size_t> range(0, N); | ||
|
||
auto body = [&](const tbb::blocked_range<size_t>& r) { | ||
for (size_t i = r.begin(); i != r.end(); ++i) { | ||
vec[i] = 1; // Set each element to 1 | ||
} | ||
}; | ||
|
||
tbb::affinity_partitioner ap; | ||
tbb::numa_partitioner<tbb::affinity_partitioner> n_partitioner(ap); | ||
|
||
// Test parallel_for with numa_partitioner and a lambda body | ||
parallel_for(range, body, n_partitioner); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need a test with more than one parallel_for invocation. I think that would uncover some of the design issues. |
||
|
||
// Verify results | ||
for (size_t i = 0; i < N; ++i) { | ||
CHECK(vec[i] == 1); | ||
} | ||
} | ||
|
||
//! Testing parallel_for with numa_partitioner | ||
//! \brief \ref requirement | ||
TEST_CASE("NUMA partitioner tests") { | ||
TestNumaPartitionerSimple(); | ||
TestNumaPartitionerWithBody(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to define a member function of numa_partitioner inside of the parallel_for header. That seems very unusual.