Skip to content

Commit

Permalink
Rename entry points, simplify state machine interaction
Browse files Browse the repository at this point in the history
Signed-off-by: Isaev, Ilya <[email protected]>
  • Loading branch information
isaevil committed Jan 22, 2025
1 parent 0f1b1a5 commit c38eb2f
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 50 deletions.
12 changes: 6 additions & 6 deletions include/oneapi/tbb/task_arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&,
TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t);

#if __TBB_PREVIEW_PARALLEL_PHASE
TBB_EXPORT void __TBB_EXPORTED_FUNC register_parallel_phase(d1::task_arena_base*, std::uintptr_t);
TBB_EXPORT void __TBB_EXPORTED_FUNC unregister_parallel_phase(d1::task_arena_base*, std::uintptr_t);
TBB_EXPORT void __TBB_EXPORTED_FUNC enter_parallel_phase(d1::task_arena_base*, std::uintptr_t);
TBB_EXPORT void __TBB_EXPORTED_FUNC exit_parallel_phase(d1::task_arena_base*, std::uintptr_t);
#endif
} // namespace r1

Expand Down Expand Up @@ -496,12 +496,12 @@ class task_arena : public task_arena_base {
#if __TBB_PREVIEW_PARALLEL_PHASE
void start_parallel_phase() {
initialize();
r1::register_parallel_phase(this, /*reserved*/0);
r1::enter_parallel_phase(this, /*reserved*/0);
}
void end_parallel_phase(bool with_fast_leave = false) {
__TBB_ASSERT(my_initialization_state.load(std::memory_order_relaxed) == do_once_state::initialized, nullptr);
// It is guaranteed by the standard that conversion of boolean to integral type will result in either 0 or 1
r1::unregister_parallel_phase(this, static_cast<std::uintptr_t>(with_fast_leave));
r1::exit_parallel_phase(this, static_cast<std::uintptr_t>(with_fast_leave));
}

class scoped_parallel_phase {
Expand Down Expand Up @@ -589,12 +589,12 @@ inline void enqueue(F&& f) {

#if __TBB_PREVIEW_PARALLEL_PHASE
inline void start_parallel_phase() {
r1::register_parallel_phase(nullptr, /*reserved*/0);
r1::enter_parallel_phase(nullptr, /*reserved*/0);
}

inline void end_parallel_phase(bool with_fast_leave) {
// It is guaranteed by the standard that conversion of boolean to integral type will result in either 0 or 1
r1::unregister_parallel_phase(nullptr, static_cast<std::uintptr_t>(with_fast_leave));
r1::exit_parallel_phase(nullptr, static_cast<std::uintptr_t>(with_fast_leave));
}
#endif

Expand Down
4 changes: 2 additions & 2 deletions rfcs/experimental/parallel_phase_for_task_arena/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,9 @@ To implement the proposed feature, the following changes were made:
* Added a new entity `thread_leave_manager` to the `r1::arena` which is responsible for
for managing the state of workers' arena leaving behaviour.
* Introduced two new entry points to the library.
* `r1::register_parallel_phase(d1::task_arena_base*, std::uintptr_t)` - used to communicate
* `r1::enter_parallel_phase(d1::task_arena_base*, std::uintptr_t)` - used to communicate
the start of parallel phase with the library.
* `r1::unregister_parallel_phase(d1::task_arena_base*, std::uintptr_t)` - used to communicate
* `r1::exit_parallel_phase(d1::task_arena_base*, std::uintptr_t)` - used to communicate
the end of parallel phase with the library.

### Thread Leave Manager
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
16 changes: 8 additions & 8 deletions src/tbb/arena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,8 @@ struct task_arena_impl {
static int max_concurrency(const d1::task_arena_base*);
static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*);
static d1::slot_id execution_slot(const d1::task_arena_base&);
static void register_parallel_phase(d1::task_arena_base*, std::uintptr_t);
static void unregister_parallel_phase(d1::task_arena_base*, std::uintptr_t);
static void enter_parallel_phase(d1::task_arena_base*, std::uintptr_t);
static void exit_parallel_phase(d1::task_arena_base*, std::uintptr_t);
};

void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
Expand Down Expand Up @@ -563,12 +563,12 @@ d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::task_arena_base& arena)
return task_arena_impl::execution_slot(arena);
}

void __TBB_EXPORTED_FUNC register_parallel_phase(d1::task_arena_base* ta, std::uintptr_t flags) {
task_arena_impl::register_parallel_phase(ta, flags);
void __TBB_EXPORTED_FUNC enter_parallel_phase(d1::task_arena_base* ta, std::uintptr_t flags) {
task_arena_impl::enter_parallel_phase(ta, flags);
}

void __TBB_EXPORTED_FUNC unregister_parallel_phase(d1::task_arena_base* ta, std::uintptr_t flags) {
task_arena_impl::unregister_parallel_phase(ta, flags);
void __TBB_EXPORTED_FUNC exit_parallel_phase(d1::task_arena_base* ta, std::uintptr_t flags) {
task_arena_impl::exit_parallel_phase(ta, flags);
}

void task_arena_impl::initialize(d1::task_arena_base& ta) {
Expand Down Expand Up @@ -918,14 +918,14 @@ int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
}

#if __TBB_PREVIEW_PARALLEL_PHASE
void task_arena_impl::register_parallel_phase(d1::task_arena_base* ta, std::uintptr_t /*reserved*/) {
void task_arena_impl::enter_parallel_phase(d1::task_arena_base* ta, std::uintptr_t /*reserved*/) {
arena* a = ta ? ta->my_arena.load(std::memory_order_relaxed) : governor::get_thread_data()->my_arena;
__TBB_ASSERT(a, nullptr);
a->my_thread_leave.register_parallel_phase();
a->advertise_new_work<arena::work_enqueued>();
}

void task_arena_impl::unregister_parallel_phase(d1::task_arena_base* ta, std::uintptr_t flags) {
void task_arena_impl::exit_parallel_phase(d1::task_arena_base* ta, std::uintptr_t flags) {
arena* a = ta ? ta->my_arena.load(std::memory_order_relaxed) : governor::get_thread_data()->my_arena;
__TBB_ASSERT(a, nullptr);
a->my_thread_leave.unregister_parallel_phase(/*with_fast_leave=*/static_cast<bool>(flags));
Expand Down
40 changes: 16 additions & 24 deletions src/tbb/arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class thread_leave_manager {
static const std::uintptr_t ONE_TIME_FAST_LEAVE = 1 << 1;
static const std::uintptr_t PARALLEL_PHASE = 1 << 2;

std::atomic<std::uintptr_t> my_state{static_cast<std::uintptr_t>(-1)};
std::atomic<std::uintptr_t> my_state{UINTPTR_MAX};
public:
// This method is not thread-safe!
// Required to be called after construction to set initial state of the state machine.
Expand All @@ -202,39 +202,31 @@ class thread_leave_manager {
}

void reset_if_needed() {
std::uintptr_t curr = ONE_TIME_FAST_LEAVE;
if (my_state.load(std::memory_order_relaxed) == curr) {
// Potentially can override decision of the parallel block from future epoch
std::uintptr_t curr = my_state.load(std::memory_order_relaxed);
if (curr == ONE_TIME_FAST_LEAVE) {
// Potentially can override decision of the parallel phase from future epoch
// but it is not a problem because it does not violate the correctness
my_state.compare_exchange_strong(curr, DELAYED_LEAVE);
my_state.fetch_and(~ONE_TIME_FAST_LEAVE);
}
}

// Indicate start of parallel phase in the state machine
void register_parallel_phase() {
std::uintptr_t prev = my_state.load(std::memory_order_relaxed);
__TBB_ASSERT(prev != std::uintptr_t(-1), "The initial state was not set");

std::uintptr_t desired{};
do {
// Need to add a reference for this start of a parallel phase, preserving the leave
// policy. Except for the case when one time fast leave was requested at the end of a
// previous phase.
desired = prev;
if (prev == ONE_TIME_FAST_LEAVE) {
// State was previously transitioned to "One-time Fast leave", thus with the start
// of new parallel phase, it should be transitioned to "Delayed leave"
desired = DELAYED_LEAVE;
}
__TBB_ASSERT(desired + PARALLEL_PHASE > desired, "Overflow detected");
desired += PARALLEL_PHASE; // Take into account this start of a parallel phase
} while (!my_state.compare_exchange_strong(prev, desired));
__TBB_ASSERT(my_state.load(std::memory_order_relaxed) != UINTPTR_MAX, "The initial state was not set");

std::uintptr_t prev = my_state.fetch_add(PARALLEL_PHASE);
__TBB_ASSERT(prev + PARALLEL_PHASE > prev, "Overflow detected");
if (prev & ONE_TIME_FAST_LEAVE) {
// State was previously transitioned to "One-time Fast leave", thus with the start
// of new parallel phase, it should be reset
my_state.fetch_and(~ONE_TIME_FAST_LEAVE);
}
}

// Indicate the end of parallel phase in the state machine
void unregister_parallel_phase(bool enable_fast_leave) {
std::uintptr_t prev = my_state.load(std::memory_order_relaxed);
__TBB_ASSERT(prev != std::uintptr_t(-1), "The initial state was not set");
__TBB_ASSERT(prev != UINTPTR_MAX, "The initial state was not set");

std::uintptr_t desired{};
do {
Expand All @@ -249,7 +241,7 @@ class thread_leave_manager {

bool is_retention_allowed() {
std::uintptr_t curr = my_state.load(std::memory_order_relaxed);
__TBB_ASSERT(curr != std::uintptr_t(-1), "The initial state was not set");
__TBB_ASSERT(curr != UINTPTR_MAX, "The initial state was not set");
return curr != FAST_LEAVE && curr != ONE_TIME_FAST_LEAVE;
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/tbb/def/lin32-tbb.def
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ _ZN3tbb6detail2r17enqueueERNS0_2d14taskEPNS2_15task_arena_baseE;
_ZN3tbb6detail2r17enqueueERNS0_2d14taskERNS2_18task_group_contextEPNS2_15task_arena_baseE;
_ZN3tbb6detail2r14waitERNS0_2d115task_arena_baseE;
_ZN3tbb6detail2r114execution_slotERKNS0_2d115task_arena_baseE;
_ZN3tbb6detail2r123register_parallel_phaseEPNS0_2d115task_arena_baseEj;
_ZN3tbb6detail2r125unregister_parallel_phaseEPNS0_2d115task_arena_baseEj;
_ZN3tbb6detail2r119exit_parallel_phaseEPNS0_2d115task_arena_baseEj;
_ZN3tbb6detail2r120enter_parallel_phaseEPNS0_2d115task_arena_baseEj;

/* System topology parsing and threads pinning (governor.cpp) */
_ZN3tbb6detail2r115numa_node_countEv;
Expand Down
4 changes: 2 additions & 2 deletions src/tbb/def/lin64-tbb.def
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ _ZN3tbb6detail2r17enqueueERNS0_2d14taskEPNS2_15task_arena_baseE;
_ZN3tbb6detail2r17enqueueERNS0_2d14taskERNS2_18task_group_contextEPNS2_15task_arena_baseE;
_ZN3tbb6detail2r14waitERNS0_2d115task_arena_baseE;
_ZN3tbb6detail2r114execution_slotERKNS0_2d115task_arena_baseE;
_ZN3tbb6detail2r123register_parallel_phaseEPNS0_2d115task_arena_baseEm;
_ZN3tbb6detail2r125unregister_parallel_phaseEPNS0_2d115task_arena_baseEm;
_ZN3tbb6detail2r119exit_parallel_phaseEPNS0_2d115task_arena_baseEm;
_ZN3tbb6detail2r120enter_parallel_phaseEPNS0_2d115task_arena_baseEm;

/* System topology parsing and threads pinning (governor.cpp) */
_ZN3tbb6detail2r115numa_node_countEv;
Expand Down
4 changes: 2 additions & 2 deletions src/tbb/def/mac64-tbb.def
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ __ZN3tbb6detail2r17enqueueERNS0_2d14taskEPNS2_15task_arena_baseE
__ZN3tbb6detail2r17enqueueERNS0_2d14taskERNS2_18task_group_contextEPNS2_15task_arena_baseE
__ZN3tbb6detail2r14waitERNS0_2d115task_arena_baseE
__ZN3tbb6detail2r114execution_slotERKNS0_2d115task_arena_baseE
__ZN3tbb6detail2r123register_parallel_phaseEPNS0_2d115task_arena_baseEm
__ZN3tbb6detail2r125unregister_parallel_phaseEPNS0_2d115task_arena_baseEm
__ZN3tbb6detail2r119exit_parallel_phaseEPNS0_2d115task_arena_baseEm
__ZN3tbb6detail2r120enter_parallel_phaseEPNS0_2d115task_arena_baseEm

# System topology parsing and threads pinning (governor.cpp)
__ZN3tbb6detail2r115numa_node_countEv
Expand Down
4 changes: 2 additions & 2 deletions src/tbb/def/win32-tbb.def
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ EXPORTS
?wait@r1@detail@tbb@@YAXAAVtask_arena_base@d1@23@@Z
?enqueue@r1@detail@tbb@@YAXAAVtask@d1@23@AAVtask_group_context@523@PAVtask_arena_base@523@@Z
?execution_slot@r1@detail@tbb@@YAGABVtask_arena_base@d1@23@@Z
?register_parallel_phase@r1@detail@tbb@@YAXPAVtask_arena_base@d1@23@I@Z
?unregister_parallel_phase@r1@detail@tbb@@YAXPAVtask_arena_base@d1@23@I@Z
?enter_parallel_phase@r1@detail@tbb@@YAXPAVtask_arena_base@d1@23@I@Z
?exit_parallel_phase@r1@detail@tbb@@YAXPAVtask_arena_base@d1@23@I@Z

; System topology parsing and threads pinning (governor.cpp)
?numa_node_count@r1@detail@tbb@@YAIXZ
Expand Down
4 changes: 2 additions & 2 deletions src/tbb/def/win64-tbb.def
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ EXPORTS
?enqueue@r1@detail@tbb@@YAXAEAVtask@d1@23@PEAVtask_arena_base@523@@Z
?enqueue@r1@detail@tbb@@YAXAEAVtask@d1@23@AEAVtask_group_context@523@PEAVtask_arena_base@523@@Z
?execution_slot@r1@detail@tbb@@YAGAEBVtask_arena_base@d1@23@@Z
?register_parallel_phase@r1@detail@tbb@@YAXPEAVtask_arena_base@d1@23@_K@Z
?unregister_parallel_phase@r1@detail@tbb@@YAXPEAVtask_arena_base@d1@23@_K@Z
?enter_parallel_phase@r1@detail@tbb@@YAXPEAVtask_arena_base@d1@23@_K@Z
?exit_parallel_phase@r1@detail@tbb@@YAXPEAVtask_arena_base@d1@23@_K@Z

; System topology parsing and threads pinning (governor.cpp)
?numa_node_count@r1@detail@tbb@@YAIXZ
Expand Down

0 comments on commit c38eb2f

Please sign in to comment.