Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5acdb34
change parallel_for to cpuParallel->parallel_for
sunxiaoxia2022 Dec 11, 2025
f6aafcf
change some parallel_for to auto
sunxiaoxia2022 Dec 16, 2025
8ed2657
add some parallel to cpu_parallel
sunxiaoxia2022 Dec 21, 2025
d4dca59
update parallel_for in dft
sunxiaoxia2022 Dec 22, 2025
feb4f8c
update parallel in color_convert
sunxiaoxia2022 Jan 5, 2026
7065083
fix conflict
sunxiaoxia2022 Jan 5, 2026
2137288
fix code style
sunxiaoxia2022 Jan 6, 2026
5afceb3
code style
sunxiaoxia2022 Jan 6, 2026
a23d9db
fix Clang-tidy issue
sunxiaoxia2022 Jan 6, 2026
42a9f1c
Clang-tidy issue
sunxiaoxia2022 Jan 6, 2026
27c9e48
Clang-tidy
sunxiaoxia2022 Jan 6, 2026
e602707
Clang-tidy
sunxiaoxia2022 Jan 6, 2026
494bb9d
clang-tidy
sunxiaoxia2022 Jan 6, 2026
9be399f
clang-tidy
sunxiaoxia2022 Jan 6, 2026
20fd0c2
fix test failure
sunxiaoxia2022 Jan 7, 2026
8334f83
add const
sunxiaoxia2022 Jan 11, 2026
5666a4c
pass cpuParallel to the execute method
sunxiaoxia2022 Jan 17, 2026
132b5f1
fix code style
sunxiaoxia2022 Jan 17, 2026
38fe348
fix ci issue
sunxiaoxia2022 Jan 17, 2026
ad0e817
clang issue
sunxiaoxia2022 Jan 17, 2026
f0e10d9
fix code style
sunxiaoxia2022 Jan 17, 2026
8c13666
fix clang-tidy issue
sunxiaoxia2022 Jan 18, 2026
a669ede
Merge branch 'master' into xiaoxia/parallel_for_auto_2.0
sunxiaoxia2022 Jan 18, 2026
ad33939
Merge branch 'master' into xiaoxia/parallel_for_auto_2.0
sunxiaoxia2022 Jan 19, 2026
62b967f
move CpuParallelPtr into ov::intel_cpu namespace
sunxiaoxia2022 Feb 11, 2026
97b6b39
Merge branch 'master' into xiaoxia/parallel_for_auto_2.0
sunxiaoxia2022 Feb 11, 2026
926dfe0
fix conflict
sunxiaoxia2022 Feb 12, 2026
92014bc
arm issue
sunxiaoxia2022 Feb 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/plugins/intel_cpu/src/cpu_memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <oneapi/dnnl/dnnl_common.hpp>
#include <vector>

#include "cpu_parallel.hpp"
#include "cpu_types.h"
#include "memory_desc/blocked_memory_desc.h"
#include "memory_desc/cpu_memory_desc.h"
Expand All @@ -31,7 +32,6 @@
#include "nodes/common/cpu_memcpy.h"
#include "nodes/reorder.h"
#include "openvino/core/except.hpp"
#include "openvino/core/parallel.hpp"
#include "openvino/core/type/bfloat16.hpp"
#include "openvino/core/type/element_type.hpp"
#include "openvino/runtime/system_conf.hpp"
Expand Down Expand Up @@ -693,6 +693,7 @@ MemoryPtr split_vertical(const dnnl::engine& eng,
int dim,
int w_rank,
int w_size,
const std::shared_ptr<CpuParallel>& cpu_parallel,
bool need_fill) {
auto desc = src->getDescPtr();
const auto& shape = src->getShape();
Expand Down Expand Up @@ -747,7 +748,7 @@ MemoryPtr split_vertical(const dnnl::engine& eng,
strideSize /= 2;
copySize /= 2;
}
parallel_for(step, [&](int i) {
cpu_parallel->parallel_for(step, [&](int i) {
int dst_offset = i * copySize;
int src_offset = i * splited_size + w_rank * strideSize;
cpu_parallel_memcpy(dstPtr + dst_offset, srcPtr + src_offset, copySize);
Expand Down
2 changes: 2 additions & 0 deletions src/plugins/intel_cpu/src/cpu_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <unordered_set>
#include <utility>

#include "cpu_parallel.hpp"
#include "cpu_types.h"
#include "dnnl_extension_utils.h"
#include "memory_desc/cpu_memory_desc.h"
Expand Down Expand Up @@ -459,6 +460,7 @@ MemoryPtr split_vertical(const dnnl::engine& eng,
int dim,
int w_rank,
int w_size,
const std::shared_ptr<CpuParallel>& cpu_parallel,
bool need_fill = true);

} // namespace ov::intel_cpu
100 changes: 100 additions & 0 deletions src/plugins/intel_cpu/src/cpu_parallel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ class CpuParallel {
[[nodiscard]] R parallel_sum(const T0& D0, const R& input, const F& func) const {
return cpu_parallel_sum(D0, input, func);
}
template <typename T0, typename T1, typename R, typename F>
[[nodiscard]] R parallel_sum2d(const T0& D0, const T1& D1, const R& input, const F& func) const {
return cpu_parallel_sum2d(D0, D1, input, func);
}
template <typename T0, typename T1, typename T2, typename R, typename F>
[[nodiscard]] R parallel_sum3d(const T0& D0, const T1& D1, const T2& D2, const R& input, const F& func) {
return cpu_parallel_sum3d(D0, D1, D2, input, func);
}
template <typename T0, typename F>
void parallel_for(const T0& D0, const F& func) const {
cpu_parallel_for(D0, func);
Expand Down Expand Up @@ -142,6 +150,98 @@ class CpuParallel {
#endif
}

template <typename T0, typename T1, typename R, typename F>
[[nodiscard]] R cpu_parallel_sum2d(const T0& D0, const T1& D1, const R& input, const F& func) const {
#if OV_THREAD == OV_THREAD_TBB_ADAPTIVE
R res_sum = 0;
if (m_partitioner == ov::intel_cpu::TbbPartitioner::AUTO) {
res_sum = _TBB_REDUCE_FUNC(
tbb::blocked_range2d<T0, T1>(0, D0, 0, D1),
input,
[&](const tbb::blocked_range2d<T0, T1>& r, R init) -> R {
R sum = init;
for (T0 dim2 = r.rows().begin(); dim2 < r.rows().end(); dim2++) {
for (T1 dim1 = r.cols().begin(); dim1 < r.cols().end(); dim1++) {
sum += func(dim2, dim1);
}
}
return sum;
},
[](R x, R y) -> R {
return x + y;
});
} else {
res_sum = _TBB_REDUCE_FUNC(
tbb::blocked_range2d<T0, T1>(0, D0, 0, D1),
input,
[&](const tbb::blocked_range2d<T0, T1>& r, R init) -> R {
R sum = init;
for (T0 dim2 = r.rows().begin(); dim2 < r.rows().end(); dim2++) {
for (T1 dim1 = r.cols().begin(); dim1 < r.cols().end(); dim1++) {
sum += func(dim2, dim1);
}
}
return sum;
},
[](R x, R y) -> R {
return x + y;
},
tbb::static_partitioner());
}
return res_sum;
#else
return ov::parallel_sum2d(D0, D1, input, func);
#endif
}

template <typename T0, typename T1, typename T2, typename R, typename F>
[[nodiscard]] R cpu_parallel_sum3d(const T0& D0, const T1& D1, const T2& D2, const R& input, const F& func) {
#if OV_THREAD == OV_THREAD_TBB_ADAPTIVE
R res_sum = 0;
if (m_partitioner == ov::intel_cpu::TbbPartitioner::AUTO) {
res_sum = _TBB_REDUCE_FUNC(
tbb::blocked_range3d<T0, T1, T2>(0, D0, 0, D1, 0, D2),
input,
[&](const tbb::blocked_range3d<T0, T1, T2>& r, R init) -> R {
R sum = init;
for (T0 dim1 = r.pages().begin(); dim1 < r.pages().end(); dim1++) {
for (T1 dim2 = r.rows().begin(); dim2 < r.rows().end(); dim2++) {
for (T2 dim3 = r.cols().begin(); dim3 < r.cols().end(); dim3++) {
sum += func(dim1, dim2, dim3);
}
}
}
return sum;
},
[](R x, R y) -> R {
return x + y;
});
} else {
res_sum = _TBB_REDUCE_FUNC(
tbb::blocked_range3d<T0, T1, T2>(0, D0, 0, D1, 0, D2),
input,
[&](const tbb::blocked_range3d<T0, T1, T2>& r, R init) -> R {
R sum = init;
for (T0 dim1 = r.pages().begin(); dim1 < r.pages().end(); dim1++) {
for (T1 dim2 = r.rows().begin(); dim2 < r.rows().end(); dim2++) {
for (T2 dim3 = r.cols().begin(); dim3 < r.cols().end(); dim3++) {
sum += func(dim1, dim2, dim3);
}
}
}
return sum;
},
[](R x, R y) -> R {
return x + y;
},
tbb::static_partitioner());
}
return res_sum;
#else
return ov::parallel_sum3d(D0, D1, D2, input, func);
#endif
}

template <typename T0, typename F>
void cpu_parallel_for(const T0& D0, const F& func) const {
#if OV_THREAD == OV_THREAD_TBB_ADAPTIVE
Expand Down
5 changes: 3 additions & 2 deletions src/plugins/intel_cpu/src/nodes/broadcast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ bool Broadcast::isSupportedOperation(const std::shared_ptr<const ov::Node>& op,
}

Broadcast::Broadcast(const std::shared_ptr<ov::Node>& op, const GraphContext::CPtr& context)
: Node(op, context, NgraphShapeInferFactory(op)) {
: Node(op, context, NgraphShapeInferFactory(op)),
cpuParallel(context->getCpuParallel()) {
std::string errorMessage;
if (!isSupportedOperation(op, errorMessage)) {
OPENVINO_THROW_NOT_IMPLEMENTED(errorMessage);
Expand Down Expand Up @@ -212,7 +213,7 @@ void Broadcast::executeDynamicImpl(const dnnl::stream& strm) {

void Broadcast::execute(const dnnl::stream& strm) {
if (optimizedCase) {
optimizedExecute(getSrcMemoryAtPort(INPUT_DATA_IDX), getDstMemoryAtPort(0));
optimizedExecute(getSrcMemoryAtPort(INPUT_DATA_IDX), getDstMemoryAtPort(0), cpuParallel);
} else {
plainExecute(strm);
}
Expand Down
1 change: 1 addition & 0 deletions src/plugins/intel_cpu/src/nodes/broadcast.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Broadcast : public Node, public TileBroadcastCommon {

std::vector<int32_t> targetShape;
std::vector<int32_t> axesMapping;
std::shared_ptr<CpuParallel> cpuParallel;
};

} // namespace ov::intel_cpu::node
13 changes: 9 additions & 4 deletions src/plugins/intel_cpu/src/nodes/causal_mask_preprocess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
#include <string>
#include <vector>

#include "cpu_parallel.hpp"
#include "cpu_types.h"
#include "graph_context.h"
#include "memory_desc/cpu_memory_desc.h"
#include "node.h"
#include "onednn/iml_type_mapper.h"
#include "openvino/core/except.hpp"
#include "openvino/core/node.hpp"
#include "openvino/core/parallel.hpp"
#include "openvino/core/type.hpp"
#include "openvino/core/type/bfloat16.hpp"
#include "openvino/core/type/element_type.hpp"
Expand Down Expand Up @@ -59,6 +59,7 @@ The functionality is equivalent to following python code:
*/
template <typename T>
struct CausalMaskPreprocess::ExecutorCausalMaskPreprocess : public CausalMaskPreprocess::Executor {
ExecutorCausalMaskPreprocess(const std::shared_ptr<CpuParallel>& parallel) : cpuParallel(parallel) {}
void execute([[maybe_unused]] const dnnl::stream& strm,
intel_cpu::Node* pnode,
[[maybe_unused]] const intel_cpu::CausalMaskPreprocessNode::Config& config) override {
Expand Down Expand Up @@ -91,7 +92,7 @@ struct CausalMaskPreprocess::ExecutorCausalMaskPreprocess : public CausalMaskPre
auto* prow = t_cache_positions.ptr<int32_t>(0);
T min_dtype = std::numeric_limits<T>::lowest();

parallel_for2d(batch_size, qLen, [&](size_t n, size_t i) {
cpuParallel->parallel_for2d(batch_size, qLen, [&](size_t n, size_t i) {
auto* pamask = t_attention_mask.ptr<int32_t>(n, 0);
auto* pdst = t_dst.ptr<T>(n, 0, i);
auto row = static_cast<size_t>(prow[i]);
Expand All @@ -110,6 +111,9 @@ struct CausalMaskPreprocess::ExecutorCausalMaskPreprocess : public CausalMaskPre
});
DEBUG_LOG("CausalMaskPreprocess::execute dst=", t_dst);
}

private:
std::shared_ptr<CpuParallel> cpuParallel;
};

CausalMaskPreprocess::CausalMaskPreprocess(const std::shared_ptr<ov::Node>& op, const GraphContext::CPtr& context)
Expand Down Expand Up @@ -138,6 +142,7 @@ bool CausalMaskPreprocess::isSupportedOperation(const std::shared_ptr<const ov::
}

void CausalMaskPreprocess::initSupportedPrimitiveDescriptors() {
auto cpuParallel = context->getCpuParallel();
if (!supportedPrimitiveDescriptors.empty()) {
return;
}
Expand All @@ -148,10 +153,10 @@ void CausalMaskPreprocess::initSupportedPrimitiveDescriptors() {
// precision preferences
if (m_config.type == "CausalMaskPreprocess") {
if (oprecs[0] == ov::element::bf16) {
m_executor = std::make_shared<ExecutorCausalMaskPreprocess<ov::bfloat16>>();
m_executor = std::make_shared<ExecutorCausalMaskPreprocess<ov::bfloat16>>(cpuParallel);
} else {
// fallback to default precision
m_executor = std::make_shared<ExecutorCausalMaskPreprocess<float>>();
m_executor = std::make_shared<ExecutorCausalMaskPreprocess<float>>(cpuParallel);
oprecs[0] = ov::element::f32;
}
// all input precisions must be int32
Expand Down
Loading
Loading