Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions cpp/daal/src/externals/core_threading_win_dll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ FARPROC load_daal_thr_func(const char * ordinal)
typedef void * (*_threaded_malloc_t)(const size_t, const size_t);
typedef void (*_threaded_free_t)(void *);

typedef void (*_daal_threader_for_t)(int, int, const void *, daal::functype);
typedef void (*_daal_threader_for_int64_t)(int64_t, const void *, daal::functype_int64);
typedef void (*_daal_threader_for_t)(int64_t, int64_t, const void *, daal::functype);
typedef void (*_daal_threader_for_int32ptr_t)(const int *, const int *, const void *, daal::functype_int32ptr);
typedef void (*_daal_threader_for_simple_t)(int, int, const void *, daal::functype);
typedef void (*_daal_static_threader_for_t)(size_t, const void *, daal::functype_static);
Expand Down Expand Up @@ -179,7 +178,7 @@ DAAL_EXPORT void _threaded_scalable_free(void * ptr)
_threaded_free_ptr(ptr);
}

DAAL_EXPORT void _daal_threader_for(int n, int threads_request, const void * a, daal::functype func)
DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t threads_request, const void * a, daal::functype func)
{
load_daal_thr_dll();
static _daal_threader_for_t _daal_threader_for_ptr = (_daal_threader_for_t)load_daal_thr_func("_daal_threader_for");
Expand All @@ -202,13 +201,6 @@ DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end,
_daal_threader_for_int32ptr_ptr(begin, end, a, func);
}

DAAL_EXPORT void _daal_threader_for_int64(int64_t n, const void * a, daal::functype_int64 func)
{
load_daal_thr_dll();
static _daal_threader_for_int64_t _daal_threader_for_int64_ptr = (_daal_threader_for_int64_t)load_daal_thr_func("_daal_threader_for_int64");
_daal_threader_for_int64_ptr(n, a, func);
}

DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::functype_static func)
{
load_daal_thr_dll();
Expand Down
24 changes: 1 addition & 23 deletions cpp/daal/src/threading/threading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,29 +102,7 @@ DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** globalCo
return 1;
}

DAAL_EXPORT void _daal_threader_for(int n, int reserved, const void * a, daal::functype func)
{
if (daal::threader_env()->getNumberOfThreads() > 1)
{
tbb::parallel_for(tbb::blocked_range<int>(0, n, 1), [&](tbb::blocked_range<int> r) {
int i;
for (i = r.begin(); i < r.end(); i++)
{
func(i, a);
}
});
}
else
{
int i;
for (i = 0; i < n; i++)
{
func(i, a);
}
}
}

DAAL_EXPORT void _daal_threader_for_int64(int64_t n, const void * a, daal::functype_int64 func)
DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t reserved, const void * a, daal::functype func)
{
if (daal::threader_env()->getNumberOfThreads() > 1)
{
Expand Down
26 changes: 2 additions & 24 deletions cpp/daal/src/threading/threading.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,9 @@ extern "C"
{
DAAL_EXPORT int _daal_threader_get_max_threads();
DAAL_EXPORT int _daal_threader_get_current_thread_index();
DAAL_EXPORT void _daal_threader_for(int n, int threads_request, const void * a, daal::functype func);
DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t threads_request, const void * a, daal::functype func);
DAAL_EXPORT void _daal_threader_reduce(const size_t n, const size_t grainSize, daal::Reducer & reducer);
DAAL_EXPORT void _daal_static_threader_reduce(const size_t n, const size_t grainSize, daal::Reducer & reducer);
DAAL_EXPORT void _daal_threader_for_int64(int64_t n, const void * a, daal::functype_int64 func);
DAAL_EXPORT void _daal_threader_for_simple(int n, int threads_request, const void * a, daal::functype func);
DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, const void * a, daal::functype_int32ptr func);
DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::functype_static func);
Expand Down Expand Up @@ -266,34 +265,13 @@ inline void threader_func_break(int i, bool & needBreak, const void * a)
/// @param[in] reserved Parameter reserved for the future. Currently unused.
/// @param[in] func Callable object that defines the loop body.
template <typename F>
inline void threader_for(int n, int reserved, const F & func)
inline void threader_for(int64_t n, int64_t reserved, const F & func)
{
const void * a = static_cast<const void *>(&func);

_daal_threader_for(n, reserved, a, threader_func<F>);
}

/// Pass a function to be executed in a for loop to the threading layer.
/// The maximal number of iterations in the loop is `2^63 - 1 (INT64_MAX)`.
/// The default scheduling of the threading layer is used to assign
/// the iterations of the loop to threads.
/// The iterations of the loop should be logically independent.
/// Data dependencies between the iterations are allowed, but may requre the use
/// of synchronization primitives.
///
/// @tparam F Callable object of type `[/* captures */](int64_t i) -> void`,
/// where `i` is the loop's iteration index, `0 <= i < n`.
///
/// @param[in] n Number of iterations in the for loop.
/// @param[in] func Callable object that defines the loop body.
template <typename F>
inline void threader_for_int64(int64_t n, const F & func)
{
const void * a = static_cast<const void *>(&func);

_daal_threader_for_int64(n, a, threader_func<F>);
}

/// Pass a function to be executed in a for loop to the threading layer.
/// The maximal number of iterations in the loop is 2^31 - 1.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void apply_weights(const pr::ndview<Float, 1>& weights, pr::ndview<Float, 2>& sa
const bk::uniform_blocking blocking(r_count, threading_block);
const auto block_count = blocking.get_block_count();

de::threader_for_int64(block_count, [&](std::int64_t b) -> void {
de::threader_for(block_count, block_count, [&](std::int64_t b) -> void {
const auto f_row = blocking.get_block_start_index(b);
const auto l_row = blocking.get_block_end_index(b);

Expand Down
10 changes: 2 additions & 8 deletions cpp/oneapi/dal/backend/interop/threading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,13 @@ ONEDAL_EXPORT int _onedal_threader_get_current_thread_index() {
return _daal_threader_get_current_thread_index();
}

ONEDAL_EXPORT void _onedal_threader_for(std::int32_t n,
std::int32_t threads_request,
ONEDAL_EXPORT void _onedal_threader_for(std::int64_t n,
std::int64_t threads_request,
const void *a,
oneapi::dal::preview::functype func) {
_daal_threader_for(n, threads_request, a, static_cast<daal::functype>(func));
}

ONEDAL_EXPORT void _onedal_threader_for_int64(std::int64_t n,
const void *a,
oneapi::dal::preview::functype_int64 func) {
_daal_threader_for_int64(n, a, static_cast<daal::functype_int64>(func));
}

ONEDAL_EXPORT void _onedal_threader_for_simple(std::int32_t n,
std::int32_t threads_request,
const void *a,
Expand Down
19 changes: 4 additions & 15 deletions cpp/oneapi/dal/detail/threading.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,11 @@ ONEDAL_EXPORT int _onedal_threader_get_max_threads();

ONEDAL_EXPORT int _onedal_threader_get_current_thread_index();

ONEDAL_EXPORT void _onedal_threader_for(std::int32_t n,
std::int32_t threads_request,
ONEDAL_EXPORT void _onedal_threader_for(std::int64_t n,
std::int64_t threads_request,
const void *a,
oneapi::dal::preview::functype func);

ONEDAL_EXPORT void _onedal_threader_for_int64(std::int64_t n,
const void *a,
oneapi::dal::preview::functype_int64 func);

ONEDAL_EXPORT void _onedal_threader_for_simple(std::int32_t n,
std::int32_t threads_request,
const void *a,
Expand Down Expand Up @@ -155,21 +151,14 @@ inline void threader_func_blocked_size(std::size_t f, std::size_t l, const void
}

template <typename F>
inline ONEDAL_EXPORT void threader_for(std::int32_t n,
std::int32_t threads_request,
inline ONEDAL_EXPORT void threader_for(std::int64_t n,
std::int64_t threads_request,
const F &lambda) {
const void *a = static_cast<const void *>(&lambda);

_onedal_threader_for(n, threads_request, a, threader_func<F>);
}

template <typename F>
inline ONEDAL_EXPORT void threader_for_int64(std::int64_t n, const F &lambda) {
const void *a = static_cast<const void *>(&lambda);

_onedal_threader_for_int64(n, a, threader_func_int64<F>);
}

template <typename F>
inline ONEDAL_EXPORT void threader_for_simple(std::int32_t n,
std::int32_t threads_request,
Expand Down
26 changes: 13 additions & 13 deletions cpp/oneapi/dal/io/csv/detail/read_graph_kernel_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ template <typename Graph>
struct collect_degrees_from_edge_list<Graph, /* IsDirected = */ false> {
template <typename EdgeList, typename AtomicType>
auto operator()(const EdgeList &edges, AtomicType *degrees_cv) {
dal::detail::threader_for_int64(edges.size(), [&](std::int64_t u) {
dal::detail::threader_for(edges.size(), edges.size(), [&](std::int64_t u) {
++degrees_cv[std::get<0>(edges[u])];
++degrees_cv[std::get<1>(edges[u])];
});
Expand All @@ -168,7 +168,7 @@ template <typename Graph>
struct collect_degrees_from_edge_list<Graph, /* IsDirected = */ true> {
template <typename EdgeList, typename AtomicType>
auto operator()(const EdgeList &edges, AtomicType *degrees_cv) {
dal::detail::threader_for_int64(edges.size(), [&](std::int64_t u) {
dal::detail::threader_for(edges.size(), edges.size(), [&](std::int64_t u) {
++degrees_cv[std::get<0>(edges[u])];
});
}
Expand Down Expand Up @@ -227,7 +227,7 @@ ONEDAL_EXPORT std::int64_t compute_prefix_sum<std::int64_t, std::int32_t>(

template <typename Index, typename AtomicIndex>
void fill_from_atomics(Index *arr, AtomicIndex *atomic_arr, std::int64_t elements_count) {
dal::detail::threader_for_int64(elements_count, [&](std::int64_t n) {
dal::detail::threader_for(elements_count, elements_count, [&](std::int64_t n) {
arr[n] = atomic_arr[n].load();
});
}
Expand All @@ -241,7 +241,7 @@ struct fill_unfiltered_neighs<Graph, /* IsDirected = */ false> {
auto operator()(const edge_list<Vertex> &edges,
AtomicEdge *rows_vec_atomic,
Vertex *unfiltered_neighs) {
dal::detail::threader_for_int64(edges.size(), [&](std::int64_t u) {
dal::detail::threader_for(edges.size(), edges.size(), [&](std::int64_t u) {
unfiltered_neighs[++rows_vec_atomic[edges[u].first] - 1] = edges[u].second;
unfiltered_neighs[++rows_vec_atomic[edges[u].second] - 1] = edges[u].first;
});
Expand All @@ -251,7 +251,7 @@ struct fill_unfiltered_neighs<Graph, /* IsDirected = */ false> {
auto operator()(const weighted_edge_list<Vertex, Weight> &edges,
AtomicEdge *rows_vec_atomic,
std::pair<Vertex, Weight> *unfiltered_neighs_vals) {
dal::detail::threader_for_int64(edges.size(), [&](std::int64_t u) {
dal::detail::threader_for(edges.size(), edges.size(), [&](std::int64_t u) {
unfiltered_neighs_vals[++rows_vec_atomic[std::get<0>(edges[u])] - 1] =
std::make_pair(std::get<1>(edges[u]), std::get<2>(edges[u]));
unfiltered_neighs_vals[++rows_vec_atomic[std::get<1>(edges[u])] - 1] =
Expand All @@ -266,7 +266,7 @@ struct fill_unfiltered_neighs<Graph, /* IsDirected = */ true> {
auto operator()(const edge_list<Vertex> &edges,
AtomicEdge *rows_vec_atomic,
Vertex *unfiltered_neighs) {
dal::detail::threader_for_int64(edges.size(), [&](std::int64_t u) {
dal::detail::threader_for(edges.size(), edges.size(), [&](std::int64_t u) {
unfiltered_neighs[++rows_vec_atomic[edges[u].first] - 1] = edges[u].second;
});
}
Expand All @@ -275,7 +275,7 @@ struct fill_unfiltered_neighs<Graph, /* IsDirected = */ true> {
auto operator()(const weighted_edge_list<Vertex, Weight> &edges,
AtomicEdge *rows_vec_atomic,
std::pair<Vertex, Weight> *unfiltered_neighs_vals) {
dal::detail::threader_for_int64(edges.size(), [&](std::int64_t u) {
dal::detail::threader_for(edges.size(), edges.size(), [&](std::int64_t u) {
unfiltered_neighs_vals[++rows_vec_atomic[std::get<0>(edges[u])] - 1] =
std::make_pair(std::get<1>(edges[u]), std::get<2>(edges[u]));
});
Expand All @@ -289,7 +289,7 @@ void fill_filtered_neighs(const EdgeIndex *unfiltered_offsets,
const EdgeIndex *filtered_offsets,
VertexIndex *filtered_neighs,
std::int64_t vertex_count) {
dal::detail::threader_for_int64(vertex_count, [&](std::int64_t u) {
dal::detail::threader_for(vertex_count, vertex_count, [&](std::int64_t u) {
auto u_neighs = filtered_neighs + filtered_offsets[u];
auto u_neighs_unf = unfiltered_neighs + unfiltered_offsets[u];
for (VertexIndex i = 0; i < filtered_degrees[u]; i++) {
Expand All @@ -306,7 +306,7 @@ void fill_filtered_neighs(const Edge *unfiltered_offsets,
Vertex *filtered_neighs,
Weight *filtered_vals,
std::int64_t vertex_count) {
dal::detail::threader_for_int64(vertex_count, [&](std::int64_t u) {
dal::detail::threader_for(vertex_count, vertex_count, [&](std::int64_t u) {
auto u_neighs = filtered_neighs + filtered_offsets[u];
auto u_neighs_vals = filtered_vals + filtered_offsets[u];
auto u_neighs_unf = unfiltered_neighs_vals + unfiltered_offsets[u];
Expand All @@ -332,7 +332,7 @@ void filter_neighbors_and_fill_new_degrees(VertexIndex *unfiltered_neighs,
VertexIndex *new_degrees,
std::int64_t vertex_count) {
//removing self-loops, multiple edges from graph, and make neighbors in CSR sorted
dal::detail::threader_for_int64(vertex_count, [&](std::int64_t u) {
dal::detail::threader_for(vertex_count, vertex_count, [&](std::int64_t u) {
auto start_p = unfiltered_neighs + unfiltered_offsets[u];
auto end_p = unfiltered_neighs + unfiltered_offsets[u + 1];

Expand All @@ -350,7 +350,7 @@ void filter_neighbors_and_fill_new_degrees(std::pair<Vertex, Weight> *unfiltered
Vertex *new_degrees,
std::int64_t vertex_count) {
//removing self-loops, multiple edges from graph, and make neighbors in CSR sorted
dal::detail::threader_for_int64(vertex_count, [&](std::int64_t u) {
dal::detail::threader_for(vertex_count, vertex_count, [&](std::int64_t u) {
auto start_p = unfiltered_neighs_vals + unfiltered_offsets[u];
auto end_p = unfiltered_neighs_vals + unfiltered_offsets[u + 1];

Expand Down Expand Up @@ -478,7 +478,7 @@ void convert_to_csr_impl(const edge_list<typename graph_traits<Graph>::vertex_ty
auto edge_offsets_tup =
ra_vertex_edge.template allocate_array<vertex_edge_set_t>(vertex_count + 1);
auto rows_vertex = std::get<1>(edge_offsets_tup);
dal::detail::threader_for_int64(vertex_count + 1, [&](std::int64_t u) {
dal::detail::threader_for(vertex_count + 1, vertex_count + 1, [&](std::int64_t u) {
rows_vertex[u] = static_cast<vertex_edge_t>(edge_offsets_data[u]);
});

Expand Down Expand Up @@ -601,7 +601,7 @@ void convert_to_csr_impl(
auto edge_offsets_tup =
ra_vertex_edge.template allocate_array<vertex_edge_set_t>(vertex_count + 1);
auto rows_vertex = std::get<1>(edge_offsets_tup);
dal::detail::threader_for_int64(vertex_count + 1, [&](std::int64_t u) {
dal::detail::threader_for(vertex_count + 1, vertex_count + 1, [&](std::int64_t u) {
rows_vertex[u] = static_cast<vertex_edge_t>(edge_offsets_data[u]);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ void copy_convert(const detail::host_policy& policy,
const std::int64_t row_count = shape.first;
const std::int64_t col_count = shape.second;

detail::threader_for_int64(row_count, [&](std::int64_t i) -> void {
detail::threader_for(row_count, row_count, [&](std::int64_t i) -> void {
auto* out_raw_ptr = out_ptrs[i];
const auto* inp_raw_ptr = inp_ptrs[i];

Expand Down
Loading