diff --git a/cudax/include/cuda/experimental/__stf/graph/graph_task.cuh b/cudax/include/cuda/experimental/__stf/graph/graph_task.cuh index a114e8f5e6e..c26a3909834 100644 --- a/cudax/include/cuda/experimental/__stf/graph/graph_task.cuh +++ b/cudax/include/cuda/experimental/__stf/graph/graph_task.cuh @@ -248,52 +248,12 @@ public: return *this; } - void populate_deps_scheduling_info() const - { - // Error checking copied from acquire() in acquire_release() - - int index = 0; - const auto& deps = get_task_deps(); - for (const auto& dep : deps) - { - if (!dep.get_data().is_initialized()) - { - fprintf(stderr, "Error: dependency number %d is an uninitialized logical data.\n", index); - abort(); - } - dep.set_symbol(dep.get_data().get_symbol()); - dep.set_data_footprint(dep.get_data().get_data_interface().data_footprint()); - index++; - } - } - /** - * @brief Use the scheduler to assign a device to this task - * - * @return returns true if the task's time needs to be recorded + * @brief Determine if the task's time needs to be recorded (for DOT visualization) */ - bool schedule_task() + bool should_record_time() { - auto& dot = *ctx.get_dot(); - auto& statistics = reserved::task_statistics::instance(); - - const bool is_auto = get_exec_place().affine_data_place() == data_place::device_auto(); - bool calibrate = false; - - // We need to know the data footprint if scheduling or calibrating tasks - if (is_auto || statistics.is_calibrating()) - { - populate_deps_scheduling_info(); - } - - if (is_auto) - { - auto [place, needs_calibration] = ctx.schedule_task(*this); - set_exec_place(place); - calibrate = needs_calibration; - } - - return dot.is_timing() || (calibrate && statistics.is_calibrating()); + return ctx.get_dot()->is_timing(); } // Only valid if we have defined a capture stream @@ -312,48 +272,23 @@ public: template void operator->*(Fun&& f) { - auto& dot = *ctx.get_dot(); - auto& statistics = reserved::task_statistics::instance(); - - // cudaEvent_t start_event, end_event; + auto& dot = *ctx.get_dot(); - bool record_time = schedule_task(); - - if (statistics.is_calibrating_to_file()) - { - record_time = true; - } + bool record_time = should_record_time(); start(); - if (record_time) - { - // Events must be created here to avoid issues with multi-gpu - // cuda_safe_call(cudaEventCreate(&start_event)); - // cuda_safe_call(cudaEventCreate(&end_event)); - // cuda_safe_call(cudaEventRecord(start_event)); - } - SCOPE(exit) { end_uncleared(); if (record_time) { - // cuda_safe_call(cudaEventRecord(end_event)); - // cuda_safe_call(cudaEventSynchronize(end_event)); - float milliseconds = 0; - // cuda_safe_call(cudaEventElapsedTime(&milliseconds, start_event, end_event)); if (dot.is_tracing()) { dot.template add_vertex_timing(*this, milliseconds); } - - if (statistics.is_calibrating()) - { - statistics.log_task_time(*this, milliseconds); - } } clear(); }; @@ -569,48 +504,23 @@ public: template void operator->*(Fun&& f) { - auto& dot = *ctx.get_dot(); - auto& statistics = reserved::task_statistics::instance(); - - // cudaEvent_t start_event, end_event; - - bool record_time = schedule_task(); + auto& dot = *ctx.get_dot(); - if (statistics.is_calibrating_to_file()) - { - record_time = true; - } + bool record_time = should_record_time(); start(); - if (record_time) - { - // Events must be created here to avoid issues with multi-gpu - // cuda_safe_call(cudaEventCreate(&start_event)); - // cuda_safe_call(cudaEventCreate(&end_event)); - // cuda_safe_call(cudaEventRecord(start_event)); - } - SCOPE(exit) { end_uncleared(); if (record_time) { - // cuda_safe_call(cudaEventRecord(end_event)); - // cuda_safe_call(cudaEventSynchronize(end_event)); - float milliseconds = 0; - // cuda_safe_call(cudaEventElapsedTime(&milliseconds, start_event, end_event)); if (dot.is_tracing()) { dot.template add_vertex_timing(*this, milliseconds); } - - if (statistics.is_calibrating()) - { - statistics.log_task_time(*this, milliseconds); - } } clear(); }; diff --git a/cudax/include/cuda/experimental/__stf/internal/acquire_release.cuh b/cudax/include/cuda/experimental/__stf/internal/acquire_release.cuh index 1af2079906d..0f386e5262e 100644 --- a/cudax/include/cuda/experimental/__stf/internal/acquire_release.cuh +++ b/cudax/include/cuda/experimental/__stf/internal/acquire_release.cuh @@ -49,8 +49,7 @@ namespace cuda::experimental::stf * generated during the acquisition of dependencies. This list represents the * prerequisites for the task to start execution. * - * @note The function `EXPECT`s the task to be in the setup phase and the execution place - * not to be `exec_place::device_auto()`. + * @note The function `EXPECT`s the task to be in the setup phase. * @note Dependencies are sorted by logical data addresses to prevent deadlocks. * @note For tasks with multiple dependencies on the same logical data, only one * instance of the data is used, and its access mode is determined by combining @@ -61,7 +60,6 @@ inline event_list task::acquire(backend_ctx_untyped& ctx) EXPECT(get_task_phase() == task::phase::setup); const auto eplace = get_exec_place(); - _CCCL_ASSERT(eplace != exec_place::device_auto(), ""); // If there are any extra dependencies to fulfill auto result = get_input_events(); diff --git a/cudax/include/cuda/experimental/__stf/internal/backend_ctx.cuh b/cudax/include/cuda/experimental/__stf/internal/backend_ctx.cuh index e400d5c22d5..f17770fc997 100644 --- a/cudax/include/cuda/experimental/__stf/internal/backend_ctx.cuh +++ b/cudax/include/cuda/experimental/__stf/internal/backend_ctx.cuh @@ -33,9 +33,7 @@ #include // backend_ctx::launch() uses execution_policy #include #include // backend_ctx_untyped::impl usese machine -#include // backend_ctx_untyped::impl uses reorderer #include -#include // backend_ctx_untyped::impl uses scheduler #include // backend_ctx uses shape_of #include #include @@ -114,9 +112,7 @@ protected: friend class backend_ctx_untyped; impl(async_resources_handle async_resources = async_resources_handle()) - : auto_scheduler(reserved::scheduler::make(getenv("CUDASTF_SCHEDULE"))) - , auto_reorderer(reserved::reorderer::make(getenv("CUDASTF_TASK_ORDER"))) - , async_resources(async_resources ? mv(async_resources) : async_resources_handle()) + : async_resources(async_resources ? mv(async_resources) : async_resources_handle()) { // Forces init cudaError_t ret = cudaFree(0); @@ -320,7 +316,7 @@ protected: void cleanup() { attached_allocators.clear(); - // Leave custom_allocator, auto_scheduler, and auto_reordered as they were. + // Leave custom_allocator as it was. } /* Current context-wide allocator (same as default_allocator unless it is changed) */ @@ -333,8 +329,6 @@ protected: ::std::vector attached_allocators; reserved::composite_slice_cache composite_cache; - ::std::unique_ptr auto_scheduler; - ::std::unique_ptr auto_reorderer; // Stats-related stuff ::std::unordered_map<::std::pair, ::std::pair, @@ -664,31 +658,11 @@ public: return pimpl->async_resources; } - bool reordering_tasks() const - { - assert(pimpl); - return pimpl->auto_reorderer != nullptr; - } - auto& get_composite_cache() { return pimpl->composite_cache; } - ::std::pair schedule_task(const task& t) const - { - assert(pimpl); - assert(pimpl->auto_scheduler); - return pimpl->auto_scheduler->schedule_task(t); - } - - void reorder_tasks(::std::vector& tasks, ::std::unordered_map& task_map) - { - assert(pimpl); - assert(pimpl->auto_reorderer); - pimpl->auto_reorderer->reorder_tasks(tasks, task_map); - } - void increment_task_count() { ++pimpl->total_task_cnt; diff --git a/cudax/include/cuda/experimental/__stf/internal/cuda_kernel_scope.cuh b/cudax/include/cuda/experimental/__stf/internal/cuda_kernel_scope.cuh index 7614445c5cd..89fb94bae1c 100644 --- a/cudax/include/cuda/experimental/__stf/internal/cuda_kernel_scope.cuh +++ b/cudax/include/cuda/experimental/__stf/internal/cuda_kernel_scope.cuh @@ -29,7 +29,6 @@ #include #include -#include namespace cuda::experimental::stf { @@ -353,9 +352,7 @@ public: t.set_symbol(symbol); } - // Do we need to measure the duration of the kernel(s) ? - auto& statistics = reserved::task_statistics::instance(); - record_time = t.schedule_task() || statistics.is_calibrating_to_file(); + record_time = t.should_record_time(); record_time_device = -1; t.start(); @@ -401,12 +398,6 @@ public: { dot.template add_vertex_timing(t, milliseconds, record_time_device); } - - auto& statistics = reserved::task_statistics::instance(); - if (statistics.is_calibrating()) - { - statistics.log_task_time(t, milliseconds); - } } } diff --git a/cudax/include/cuda/experimental/__stf/internal/host_launch_scope.cuh b/cudax/include/cuda/experimental/__stf/internal/host_launch_scope.cuh index 92b7e8896cd..0b9ecfb0bab 100644 --- a/cudax/include/cuda/experimental/__stf/internal/host_launch_scope.cuh +++ b/cudax/include/cuda/experimental/__stf/internal/host_launch_scope.cuh @@ -30,7 +30,6 @@ #include #include #include -#include #include #include @@ -114,8 +113,7 @@ public: template void operator->*(Fun&& f) { - auto& dot = *ctx.get_dot(); - auto& statistics = reserved::task_statistics::instance(); + auto& dot = *ctx.get_dot(); auto t = ctx.task(exec_place::host()); t.add_deps(deps); @@ -125,7 +123,7 @@ public: } cudaEvent_t start_event, end_event; - const bool record_time = t.schedule_task() || statistics.is_calibrating_to_file(); + const bool record_time = t.should_record_time(); t.start(); @@ -156,11 +154,6 @@ public: { dot.template add_vertex_timing(t, milliseconds, -1); } - - if (statistics.is_calibrating()) - { - statistics.log_task_time(t, milliseconds); - } } } t.clear(); diff --git a/cudax/include/cuda/experimental/__stf/internal/launch.cuh b/cudax/include/cuda/experimental/__stf/internal/launch.cuh index 05be8406203..709b49f3640 100644 --- a/cudax/include/cuda/experimental/__stf/internal/launch.cuh +++ b/cudax/include/cuda/experimental/__stf/internal/launch.cuh @@ -23,7 +23,6 @@ #include // launch_impl() uses execution_policy #include #include -#include #include #include // graph_launch_impl() uses SCOPE @@ -322,21 +321,14 @@ public: EXPECT(e_place != exec_place::host(), "Attempt to run a launch on the host."); - auto& dot = *ctx.get_dot(); - auto& statistics = reserved::task_statistics::instance(); + auto& dot = *ctx.get_dot(); auto t = ctx.task(e_place); _CCCL_ASSERT(e_place.affine_data_place() == t.get_affine_data_place(), "Affine data places must match"); - /* - * If we have a grid (including 1-element grids), the implicit affine partitioner is the blocked_partition. - * - * An explicit composite data place is required per data dependency to customize this behaviour. - */ if (e_place.size() > 1) { - // Create a composite data place defined by the grid of places + the partitioning function t.set_affine_data_place(data_place::composite(blocked_partition(), e_place.as_grid())); } @@ -346,14 +338,8 @@ public: t.set_symbol(symbol); } - bool record_time = t.schedule_task(); - // Execution place may have changed during scheduling task - e_place = t.get_exec_place(); - - if (statistics.is_calibrating_to_file()) - { - record_time = true; - } + bool record_time = t.should_record_time(); + e_place = t.get_exec_place(); nvtx_range nr(t.get_symbol().c_str()); t.start(); @@ -415,11 +401,6 @@ public: { dot.template add_vertex_timing>(t, milliseconds, device); } - - if (statistics.is_calibrating()) - { - statistics.log_task_time(t, milliseconds); - } } } diff --git a/cudax/include/cuda/experimental/__stf/internal/parallel_for_scope.cuh b/cudax/include/cuda/experimental/__stf/internal/parallel_for_scope.cuh index 8033bd00cbe..9b01a3d89c7 100644 --- a/cudax/include/cuda/experimental/__stf/internal/parallel_for_scope.cuh +++ b/cudax/include/cuda/experimental/__stf/internal/parallel_for_scope.cuh @@ -26,7 +26,6 @@ #include // for null_partition #include #include -#include #include namespace cuda::experimental::stf @@ -541,19 +540,15 @@ public: template void operator->*(Fun&& f) { - auto& dot = *ctx.get_dot(); - auto& statistics = reserved::task_statistics::instance(); - auto t = ctx.task(e_place); + auto& dot = *ctx.get_dot(); + auto t = ctx.task(e_place); assert(e_place.affine_data_place() == t.get_affine_data_place()); - // If there is a partitioner, we ensure there is a proper affine data place for this execution place if constexpr (!::std::is_same_v) { - // Grids need a composite data place if (e_place.size() > 1) { - // Create a composite data place defined by the grid of places + the partitioning function t.set_affine_data_place(data_place::composite(partitioner_t(), e_place.as_grid())); } } @@ -564,7 +559,7 @@ public: t.set_symbol(symbol); } - const bool record_time = t.schedule_task() || statistics.is_calibrating_to_file(); + const bool record_time = t.should_record_time(); nvtx_range nr(t.get_symbol().c_str()); t.start(); @@ -589,11 +584,6 @@ public: { dot.template add_vertex_timing(t, milliseconds, device); } - - if (statistics.is_calibrating()) - { - statistics.log_task_time(t, milliseconds); - } } } @@ -711,7 +701,6 @@ public: { // parallel_for never calls this function with a host. _CCCL_ASSERT(sub_exec_place != exec_place::host(), "Internal CUDASTF error."); - _CCCL_ASSERT(sub_exec_place != exec_place::device_auto(), "Internal CUDASTF error."); using Fun_no_ref = ::std::remove_reference_t; @@ -898,12 +887,6 @@ public: // parallel_for never calls this function with a host. _CCCL_ASSERT(sub_exec_place != exec_place::host(), "Internal CUDASTF error."); - if (sub_exec_place == exec_place::device_auto()) - { - // We have all latitude - recurse with the current device. - return do_parallel_for(::std::forward(f), exec_place::current_device(), sub_shape, t); - } - using Fun_no_ref = ::std::remove_reference_t; static const auto conf = [] { diff --git a/cudax/include/cuda/experimental/__stf/internal/reorderer.cuh b/cudax/include/cuda/experimental/__stf/internal/reorderer.cuh deleted file mode 100644 index 868df3c5125..00000000000 --- a/cudax/include/cuda/experimental/__stf/internal/reorderer.cuh +++ /dev/null @@ -1,379 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// Part of CUDASTF in CUDA C++ Core Libraries, -// under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// SPDX-FileCopyrightText: Copyright (c) 2022-2024 NVIDIA CORPORATION & AFFILIATES. -// -//===----------------------------------------------------------------------===// - -/** @file - * - * @brief Implements automatic task reordering - * - */ - -#pragma once - -#include - -#if defined(_CCCL_IMPLICIT_SYSTEM_HEADER_GCC) -# pragma GCC system_header -#elif defined(_CCCL_IMPLICIT_SYSTEM_HEADER_CLANG) -# pragma clang system_header -#elif defined(_CCCL_IMPLICIT_SYSTEM_HEADER_MSVC) -# pragma system_header -#endif // no system header - -#include // reorderer_payload uses task_dep_vector_untyped -#include // heft_scheduler uses statistics_t - -#include // ::std::shuffle -#include // ::std::function -#include // ::std::unique_ptr -#include -#include -#include -#include -#include - -namespace cuda::experimental::stf::reserved -{ -/** - * @brief We cannot pass deferred_stream_task<> to the reorderer due to circular - * dependencies, so we store all the necessary info in this struct instead. - */ -struct reorderer_payload -{ - reorderer_payload( - ::std::string s, int id, ::std::unordered_set succ, ::std::unordered_set pred, task_dep_vector_untyped d) - : symbol(mv(s)) - , mapping_id(id) - , successors(mv(succ)) - , predecessors(mv(pred)) - , deps(mv(d)) - {} - - reorderer_payload() = delete; - - bool done = false; - bool done_execution = false; - double upward_rank = -1.0; - int device = -1; - - size_t num_successors() const - { - return successors.size(); - } - size_t num_predecessors() const - { - return predecessors.size(); - } - - /// Needed for task_statistics - const ::std::string& get_symbol() const - { - return symbol; - } - - /// Needed for task_statistics - const task_dep_vector_untyped& get_task_deps() const - { - return deps; - } - - ::std::string symbol; - int mapping_id; - ::std::unordered_set successors; - ::std::unordered_set predecessors; - task_dep_vector_untyped deps; -}; - -/** - * @brief The reorderer class defines the interface for all reorderers - */ -class reorderer -{ -public: - /** - * @brief Reorder a vector of tasks - * - * @param tasks The vector of tasks to be reordered - */ - virtual void reorder_tasks(::std::vector& tasks, ::std::unordered_map& task_map) = 0; - - /// @brief Destructor for the reorderer - virtual ~reorderer() = default; - - static ::std::unique_ptr make(const char* reorderer_type); - -protected: - reorderer() = default; - - const int num_devices = cuda_try(); -}; - -class random_reorderer : public reorderer -{ -public: - random_reorderer() = default; - - void reorder_tasks(::std::vector& tasks, ::std::unordered_map&) override - { - ::std::shuffle(::std::begin(tasks), ::std::end(tasks), gen); - } - -private: - ::std::mt19937 gen = ::std::mt19937(::std::random_device()()); -}; - -class heft_reorderer : public reorderer -{ -public: - heft_reorderer() - : reorderer() - { - const char* filename = getenv("CUDASTF_TASK_STATISTICS"); - - if (filename) - { - statistics.read_statistics_file(filename); - } - else - { - statistics.enable_calibration(); - } - } - - void reorder_tasks(::std::vector& tasks, ::std::unordered_map& task_map) override - { - calculate_upward_ranks(tasks, task_map); - rearrange_tasks(tasks, task_map); - } - -private: - void calculate_upward_ranks(const ::std::vector& tasks, - ::std::unordered_map& task_map) const - { - ::std::queue work_list; // queue of mapping ids - ::std::unordered_set tasks_done; - - double comm_cost = 0.2; - - // Initialize the work_list with the leaf tasks - for (int id : tasks) - { - auto& t = task_map.at(id); - if (t.num_successors() == 0) - { - work_list.push(t.mapping_id); - } - } - - while (work_list.size() > 0) - { - auto& current_task = task_map.at(work_list.front()); - work_list.pop(); - - tasks_done.insert(current_task.mapping_id); - current_task.done = true; - - // The second term in the upward_rank equation that gets added to the task cost - double second_term = 0.0; - for (int s : current_task.successors) - { - const auto& succ = task_map.at(s); - assert(succ.upward_rank != -1); - second_term = ::std::max(second_term, comm_cost + succ.upward_rank); - } - - ::std::pair stats; - double task_cost; - if (current_task.get_symbol().rfind("task ", 0) == 0) - { - task_cost = 0; - } - else - { - stats = statistics.get_task_stats(current_task); - task_cost = ::std::get<0>(stats); - } - current_task.upward_rank = task_cost + second_term; - - for (int p : current_task.predecessors) - { - const auto& pred = task_map.at(p); - if (tasks_done.count(p)) - { - continue; - } - - bool add_it = true; - for (int s : pred.successors) - { - const auto& succ = task_map.at(s); - if (succ.upward_rank == -1) - { - add_it = false; - break; - } - } - - if (add_it) - { - work_list.push(pred.mapping_id); - } - } - } - } - - /** - * @brief Now that we've calculated the upward ranks, we need to rearrange - * the tasks. This isn't as simple as sorting the vector of tasks according - * to the upward rank, as we need to take into account when tasks are ready. - */ - void rearrange_tasks(::std::vector& tasks, ::std::unordered_map& task_map) const - { - using task_priority = ::std::pair; // the double is the upward rank, needed for sorting. Should I just use - // the reorderer_payload - auto cmp = [](const task_priority& p1, const task_priority& p2) { - return p1.second < p2.second; - }; - ::std::priority_queue, decltype(cmp)> ready_tasks(cmp); - - for (int id : tasks) - { - const auto& t = task_map.at(id); - if (t.num_predecessors() == 0) - { - ready_tasks.emplace(t.mapping_id, t.upward_rank); - } - } - - ::std::unordered_set tasks_done; // shouldn't be necessary but we need it now - ::std::vector actual_order; - - while (ready_tasks.size() > 0) - { - auto [id, upward_rank] = ready_tasks.top(); - ready_tasks.pop(); - - auto& current_task = task_map.at(id); - current_task.done_execution = true; - tasks_done.insert(id); - actual_order.push_back(id); - - for (int succ_id : current_task.successors) - { - if (tasks_done.count(succ_id)) - { - continue; - } - - const auto& succ = task_map.at(succ_id); - bool is_ready_now = true; - - for (int pred_id : succ.predecessors) - { - const auto& pred = task_map.at(pred_id); - - if (!pred.done_execution) - { - is_ready_now = false; - } - } - - if (is_ready_now) - { - ready_tasks.emplace(succ_id, succ.upward_rank); - } - } - } - - tasks = mv(actual_order); - } - - task_statistics& statistics = task_statistics::instance(); -}; - -class post_mortem_reorderer : public reorderer -{ -public: - post_mortem_reorderer(const char* order_file) - : reorderer() - { - read_order_file(order_file); - } - -private: - void reorder_tasks(::std::vector& tasks, ::std::unordered_map&) override - { - tasks = file_order; - } - - /* Read the csv schedule file mapping tasks to devices */ - void read_order_file(const char* filename) - { - ::std::ifstream file(filename); - EXPECT(file, "Failed to open order file: '", filename, "'."); - - int current_line = 0; - for (::std::string line; ::std::getline(file, line); ++current_line) - { - ::std::stringstream ss(line); - - int mapping_id = -1; - - int column = 0; - for (::std::string cell; ::std::getline(ss, cell, ','); ++column) - { - if (column == 1) - { - mapping_id = ::std::stoi(cell); - } - } - - EXPECT(mapping_id >= 0, "Invalid mapping id value '", mapping_id, "' provided on line '", current_line, "'."); - - file_order.push_back(mapping_id); - } - } - - ::std::vector file_order; -}; - -inline ::std::unique_ptr reorderer::make(const char* reorderer_type) -{ - if (!reorderer_type) - { - return nullptr; - } - - const auto reorderer_type_s = ::std::string(reorderer_type); - - if (reorderer_type_s == "random") - { - return ::std::make_unique(); - } - - if (reorderer_type_s == "heft") - { - return ::std::make_unique(); - } - - if (reorderer_type_s == "post_mortem") - { - const char* order_file = getenv("CUDASTF_ORDER_FILE"); - - EXPECT(order_file, "CUDASTF_TASK_ORDER set to 'post_mortem' but CUDASTF_SCHEDULE_FILE is unset."); - EXPECT(::std::filesystem::exists(order_file), "CUDASTF_ORDER_FILE '", order_file, "' does not exist"); - - return ::std::make_unique(order_file); - } - - fprintf(stderr, "Invalid CUDASTF_TASK_ORDER value '%s'\n", reorderer_type); - abort(); -} -} // namespace cuda::experimental::stf::reserved diff --git a/cudax/include/cuda/experimental/__stf/internal/scheduler.cuh b/cudax/include/cuda/experimental/__stf/internal/scheduler.cuh deleted file mode 100644 index e6af82783ec..00000000000 --- a/cudax/include/cuda/experimental/__stf/internal/scheduler.cuh +++ /dev/null @@ -1,465 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// Part of CUDASTF in CUDA C++ Core Libraries, -// under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// SPDX-FileCopyrightText: Copyright (c) 2022-2024 NVIDIA CORPORATION & AFFILIATES. -// -//===----------------------------------------------------------------------===// - -/** - * @file - * - * @brief Implements automatic task scheduling - * - * CUDASTF_SCHEDULE - * CUDASTF_SCHEDULE_FILE - */ - -#pragma once - -#include - -#if defined(_CCCL_IMPLICIT_SYSTEM_HEADER_GCC) -# pragma GCC system_header -#elif defined(_CCCL_IMPLICIT_SYSTEM_HEADER_CLANG) -# pragma clang system_header -#elif defined(_CCCL_IMPLICIT_SYSTEM_HEADER_MSVC) -# pragma system_header -#endif // no system header - -#include // scheduler uses task -#include // heft_scheduler uses statistics_t - -#include // rand() -#include // ::std::filesystem::exists -#include // ::std::ifstream -#include -#include // ::std::numeric_limits::max() -#include // random_scheduler uses rng -#include // ::std::stringstream -#include -#include -#include - -namespace cuda::experimental::stf::reserved -{ -/** - * @brief The scheduler class defines the interface that all schedulers must follow to assign tasks to devices - */ -class scheduler -{ -public: - scheduler() - { - cuda_safe_call(cudaGetDeviceCount(&num_devices)); - assert(num_devices > 0); - } - - /** - * @brief Assign a task to a device - * - * @param mapping_id The mapping ID of the task - * @return The device ID of the assigned device and a boolean whether this task still needs calibration - */ - virtual ::std::pair schedule_task(const task& t) = 0; - - /// @brief Destructor for the scheduler - virtual ~scheduler() = default; - - static ::std::unique_ptr make(const char* schedule_type); - -protected: - int num_devices = 0; - - // Map from task id to device - using schedule_t = ::std::unordered_map>; -}; - -class random_scheduler : public scheduler -{ -public: - ::std::pair schedule_task(const task&) override - { - return {exec_place::device(dist(gen)), false}; - } - -private: - ::std::mt19937 gen = ::std::mt19937(::std::random_device()()); - ::std::uniform_int_distribution<> dist = ::std::uniform_int_distribution<>(0, num_devices - 1); -}; - -class round_robin_scheduler : public scheduler -{ -public: - round_robin_scheduler() = default; - - ::std::pair schedule_task(const task&) override - { - return {exec_place::device(current_device++ % num_devices), false}; - } - -private: - int current_device = 0; -}; - -class post_mortem_scheduler : public scheduler -{ -public: - post_mortem_scheduler(const char* schedule_file) - { - read_schedule_file(schedule_file); - } - - ::std::pair schedule_task(const task& t) override - { - return {exec_place::device(schedule[t.get_mapping_id()]), false}; - } - -private: - schedule_t schedule; - - /* Read the csv schedule file mapping tasks to devices */ - void read_schedule_file(const char* filename) - { - ::std::ifstream file(filename); - EXPECT(file, "Failed to open schedule file: '", filename, "'."); - - int current_line = 0; - for (::std::string line; ::std::getline(file, line); ++current_line) - { - ::std::stringstream ss(line); - - int mapping_id = -1; - int dev_id = -1; - - int column = 0; - for (::std::string cell; ::std::getline(ss, cell, ','); ++column) - { - if (column == 0) - { - dev_id = ::std::stoi(cell); - } - else if (column == 1) - { - mapping_id = ::std::stoi(cell); - } - } - - EXPECT(dev_id >= 0, "Invalid device id value '", dev_id, "' provided on line '", current_line, "'."); - EXPECT(mapping_id >= 0, "Invalid mapping id value '", mapping_id, "' provided on line '", current_line, "'."); - - schedule[mapping_id] = dev_id; - } - } -}; - -class heft_scheduler : public scheduler -{ -public: - heft_scheduler() - : gpu_loads(num_devices, 0.0) - , msi(num_devices) - { - const char* filename = getenv("CUDASTF_TASK_STATISTICS"); - - if (filename) - { - statistics.read_statistics_file(filename); - } - else - { - statistics.enable_calibration(); - } - } - - ::std::pair schedule_task(const task& t) override - { - auto [task_cost, num_calls] = statistics.get_task_stats(t); - - if (num_calls == 0) - { - task_cost = default_cost; - } - - double best_end = ::std::numeric_limits::max(); - int best_device = -1; - - for (int i = 0; i < num_devices; i++) - { - int current_device = i; - - double total_cost = cost_on_device(t, current_device, task_cost, gpu_loads[current_device]); - if (total_cost < best_end) - { - best_device = current_device; - best_end = total_cost; - } - } - - gpu_loads[best_device] = best_end; - schedule[t.get_mapping_id()] = best_device; - - auto& deps = t.get_task_deps(); - for (const auto& dep : deps) - { - msi.update_msi_for_dep(best_device, dep, best_end); - } - - bool needs_calibration = num_calls < num_samples; - - return {exec_place::device(best_device), needs_calibration}; - } - - ~heft_scheduler() - { - const char* schedule_file = getenv("CUDASTF_HEFT_SCHEDULE"); - if (schedule_file) - { - write_schedule_file(schedule_file); - } - } - -private: - class msi_protocol - { - public: - msi_protocol(int num_devices) - : num_devices(num_devices) - {} - - double when_available(int device_id, const task_dep_untyped& dep) - { - auto& info = get_symbol_info(dep.get_symbol()); - const ::std::pair& device_info = info[device_id]; - - msi_state device_state = device_info.first; - switch (device_state) - { - case msi_state::modified: - case msi_state::shared: - return device_info.second; - default: - break; - } - - double earliest = get_earliest(dep); - return earliest; - } - - void update_msi_for_dep(int device_id, const task_dep_untyped& dep, double task_end) - { - const ::std::string& symbol = dep.get_symbol(); - const access_mode mode = dep.get_access_mode(); - - auto& info = get_symbol_info(symbol); - ::std::pair& device_info = info[device_id]; - - // Update local state first - switch (device_info.first) - { - case msi_state::modified: - device_info.second = task_end; - break; - case msi_state::shared: - if (mode != access_mode::read) - { - device_info.first = msi_state::modified; - device_info.second = task_end; - } - break; - case msi_state::invalid: - if (mode == access_mode::read) - { - device_info.first = msi_state::shared; - device_info.second = get_earliest(dep); - } - else - { - device_info.first = msi_state::modified; - device_info.second = task_end; - } - break; - } - - for (int i = 0; i < num_devices; i++) - { - if (i == device_id) // already updated - { - continue; - } - - ::std::pair& i_info = info[i]; - - msi_state state = i_info.first; - switch (state) - { - case msi_state::modified: - if (mode == access_mode::read) - { - i_info.first = msi_state::shared; - } - else - { - i_info.first = msi_state::invalid; - i_info.second = -1.0; - } - break; - case msi_state::shared: - if (mode != access_mode::read) - { - i_info.first = msi_state::invalid; - i_info.second = -1.0; - } - break; - default: - assert(state == msi_state::invalid); - break; - } - } - } - - private: - enum class msi_state : unsigned int - { - modified = 0, - shared = 1, - invalid = 2 - }; - - int num_devices; - const double bandwidth = 250 * 1e5; // Bytes/ms, Obtained by running - // cuda-samples/Samples/5_Domain_Specific/p2pBandwidthLatencyTest - using cache_state = ::std::vector<::std::pair>; - ::std::unordered_map<::std::string, cache_state> cache; - - cache_state& get_symbol_info(const ::std::string& symbol) - { - auto it = cache.find(symbol); - if (it == cache.end()) - { - cache_state state(num_devices, ::std::make_pair(msi_state::invalid, -1.0)); - cache.emplace(symbol, mv(state)); - it = cache.find(symbol); - } - - return it->second; - } - - double get_earliest(const task_dep_untyped& dep) const - { - const ::std::string& symbol = dep.get_symbol(); - const auto& info = cache.at(symbol); // need to use at() to keep method const - double earliest = ::std::numeric_limits::max(); - - bool found_one = false; - - double comm_cost = dep.get_data_footprint() / bandwidth; - - for (int i = 0; i < num_devices; i++) - { - const ::std::pair& device_info = info[i]; - if (device_info.first != msi_state::invalid) - { - found_one = true; - earliest = ::std::min(earliest, device_info.second + comm_cost); - } - } - - if (!found_one) // If the data is not on any device - { - earliest = comm_cost; - } - - return earliest; - } - }; - - template - double cost_on_device(const task_type& t, int device_id, double task_cost, double when_can_start) - { - double data_available = 0.0; - - const auto& deps = t.get_task_deps(); - for (const auto& dep : deps) - { - access_mode mode = dep.get_access_mode(); - switch (mode) - { - case access_mode::read: - case access_mode::rw: - data_available = ::std::max(data_available, msi.when_available(device_id, dep)); - break; - default: - break; - } - } - - double possible_start = ::std::max(when_can_start, data_available); - double end = possible_start + task_cost; - - return end; - } - - void write_schedule_file(const char* schedule_file) const - { - ::std::ofstream file(schedule_file); - if (!file) - { - ::std::cerr << "Failed to write to heft schedule file '" << schedule_file << "'.\n"; - return; - } - - for (const auto& [device_id, mapping_id] : schedule) - { - file << mapping_id << "," << device_id << '\n'; - } - } - - ::std::vector gpu_loads; // TODO: is it better to use a ::std::array? - task_statistics& statistics = task_statistics::instance(); - msi_protocol msi; - schedule_t schedule; - const int num_samples = 5; - const double default_cost = 0.5; -}; - -inline ::std::unique_ptr scheduler::make(const char* schedule_type) -{ - if (!schedule_type) - { - return nullptr; - } - - const auto schedule_type_s = ::std::string(schedule_type); - - if (schedule_type_s == "post_mortem") - { - const char* schedule_file = getenv("CUDASTF_SCHEDULE_FILE"); - - EXPECT(schedule_file, "CUDASTF_SCHEDULE set to 'post_mortem' but CUDASTF_SCHEDULE_FILE is unset."); - EXPECT(::std::filesystem::exists(schedule_file), "CUDASTF_SCHEDULE_FILE '", schedule_file, "' does not exist"); - - return ::std::make_unique(schedule_file); - } - - if (schedule_type_s == "random") - { - return ::std::make_unique(); - } - - if (schedule_type_s == "round_robin") - { - return ::std::make_unique(); - } - - if (schedule_type_s == "heft") - { - return ::std::make_unique(); - } - - ::std::cerr << "Invalid CUDASTF_SCHEDULE value '" << schedule_type << "'\n"; - exit(EXIT_FAILURE); -} -} // namespace cuda::experimental::stf::reserved diff --git a/cudax/include/cuda/experimental/__stf/internal/task.cuh b/cudax/include/cuda/experimental/__stf/internal/task.cuh index 7bf88e1daf3..a1786defcf9 100644 --- a/cudax/include/cuda/experimental/__stf/internal/task.cuh +++ b/cudax/include/cuda/experimental/__stf/internal/task.cuh @@ -38,14 +38,6 @@ namespace cuda::experimental::stf { -namespace reserved -{ -class mapping_id_tag -{}; - -using mapping_id_t = reserved::unique_id; -} // end namespace reserved - class backend_ctx_untyped; class logical_data_untyped; class exec_place; @@ -136,9 +128,6 @@ private: // Used to uniquely identify the task reserved::unique_id_t unique_id; - // Used to uniquely identify the task for mapping purposes - reserved::mapping_id_t mapping_id; - // RAII guard for the task's execution place activation. // Created in acquire_deps, destroyed in release_deps. // Empty (inactive) scope when not in use. @@ -373,12 +362,6 @@ public: return pimpl->unique_id; } - // Get the unique task mapping identifier - int get_mapping_id() const - { - return pimpl->mapping_id; - } - size_t hash() const { return ::std::hash()(pimpl.get()); @@ -553,12 +536,7 @@ public: data_instance(bool used, data_place dplace) : used(used) , dplace(mv(dplace)) - { -#if 0 - // Since this will default construct a task, we need to decrement the id - reserved::mapping_id_t::decrement_id(); -#endif - } + {} void set_used(bool flag) { diff --git a/cudax/include/cuda/experimental/__stf/internal/task_dep.cuh b/cudax/include/cuda/experimental/__stf/internal/task_dep.cuh index d24edc4980b..d862ec19ea4 100644 --- a/cudax/include/cuda/experimental/__stf/internal/task_dep.cuh +++ b/cudax/include/cuda/experimental/__stf/internal/task_dep.cuh @@ -146,16 +146,6 @@ public: return symbol; } - void set_data_footprint(size_t f) const - { - data_footprint = f; - } - - size_t get_data_footprint() const - { - return data_footprint; - } - void reset_logical_data() { data = nullptr; @@ -171,7 +161,6 @@ private: // setting them only during scheduling (since task_dep can only be accessed // as const ref) mutable ::std::string symbol; - mutable size_t data_footprint = 0; mutable data_place dplace; ::std::shared_ptr redux_op; diff --git a/cudax/include/cuda/experimental/__stf/internal/task_statistics.cuh b/cudax/include/cuda/experimental/__stf/internal/task_statistics.cuh deleted file mode 100644 index 628a0d2b1af..00000000000 --- a/cudax/include/cuda/experimental/__stf/internal/task_statistics.cuh +++ /dev/null @@ -1,301 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// Part of CUDASTF in CUDA C++ Core Libraries, -// under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// SPDX-FileCopyrightText: Copyright (c) 2022-2024 NVIDIA CORPORATION & AFFILIATES. -// -//===----------------------------------------------------------------------===// - -/** - * @file - * - * @brief Implements the tracking and recording of task statistics. Each task is uniquely - * identified by the user set task symbol and the size of its dependencies. Can be enabled - * by setting the CUDASTF_CALIBRATION_FILE environment variable to point to the file which - * will store the results. - * - * - * CUDASTF_CALIBRATION_FILE - */ - -#pragma once - -#include - -#if defined(_CCCL_IMPLICIT_SYSTEM_HEADER_GCC) -# pragma GCC system_header -#elif defined(_CCCL_IMPLICIT_SYSTEM_HEADER_CLANG) -# pragma clang system_header -#elif defined(_CCCL_IMPLICIT_SYSTEM_HEADER_MSVC) -# pragma system_header -#endif // no system header - -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -namespace cuda::experimental::stf::reserved -{ -/** - * @brief This class stores statistics about task execution time - */ -class task_statistics : public reserved::meyers_singleton -{ -protected: - task_statistics() - { - const char* filename = ::std::getenv("CUDASTF_CALIBRATION_FILE"); - - if (!filename) - { - calibrating = false; - return; - } - - calibrating = true; - calibration_file = ::std::string(filename); - } - - ~task_statistics() - { - if (!calibration_file.empty()) - { - write_stats(); - } - } - -public: - bool is_calibrating() const - { - return calibrating; - } - - bool is_calibrating_to_file() const - { - return !calibration_file.empty(); - } - - /** - * @brief Enable online calibration - */ - void enable_calibration() - { - calibrating = true; - } - - template - void log_task_time(const task_type& t, double time) - { - auto key = ::std::pair{t.get_symbol(), get_data_footprint(t)}; - - auto it = statistics.find(key); - if (it == statistics.end()) - { - statistics.emplace(key, statistic(time)); - } - else - { - it->second.update(time); - } - } - - class statistic - { - public: - statistic() = delete; - explicit statistic(double initial_time) - : num_calls(1) - , mean(initial_time) - , squares(0.0) - {} - - // Constructor when reading from file - statistic(int num_calls, double mean, double stddev) - : num_calls(num_calls) - , mean(mean) - , stddev(stddev) - {} - - void update(double new_time) - { - // Use Welford's method to compute mean https://stackoverflow.com/a/15638726 - num_calls++; - - double delta = new_time - mean; - - mean += delta / num_calls; - squares += delta * (new_time - mean); - - double variance = (num_calls == 1 ? 0 : squares / (num_calls - 1)); - stddev = ::std::sqrt(variance); - } - - int get_num_calls() const - { - return num_calls; - } - double get_mean() const - { - return mean; - } - double get_stddev() const - { - return stddev; - } - - private: - int num_calls = 0; - double mean = 0.0; - double squares = 0.0; - - // Only used when reading file - double stddev = 0.0; - }; - - using statistics_map_key_t = ::std::pair<::std::string, size_t>; - using statistics_map_t = - ::std::unordered_map>; - - void read_statistics_file(const char* filename) - { - _CCCL_ASSERT(!is_calibrating(), "Cannot calibrate if we read task statistics from a file."); - - ::std::ifstream file(filename); - EXPECT(file, "Failed to read statistics file '", filename, "'."); - - int current_line = 0; - for (::std::string line; ::std::getline(file, line); ++current_line) - { - if (current_line == 0) // This is the csv header - { - continue; - } - - ::std::stringstream ss(line); - - ::std::string task_name; - size_t size = 0; - double time = -1.0; - int num_calls = -1; - double stddev = -1.0; - - int column = 0; - for (::std::string cell; ::std::getline(ss, cell, ','); ++column) - { - EXPECT(column < 5, "Invalid number of columns in statistics file ", column + 1, " (expected 5)."); - - if (column == 0) - { - task_name = cell; - } - else if (column == 1) - { - size = ::std::stoul(cell); - } - else if (column == 2) - { - num_calls = ::std::stoi(cell); - } - else if (column == 3) - { - time = ::std::stod(cell); - } - else if (column == 4) - { - stddev = ::std::stod(cell); - } - } - - EXPECT(time >= 0, "Invalid time value '", time, "' provided on line '", current_line, "'."); - EXPECT(num_calls >= 0, "Invalid num_calls value '", num_calls, "' provided on line '", current_line, "'."); - EXPECT(stddev >= 0, "Invalid stddev value '", stddev, "' provided on line '", current_line, "'."); - - ::std::pair<::std::string, size_t> key(task_name, size); - statistics.emplace(key, statistic(num_calls, time, stddev)); - } - } - - /** - * @brief Get statistics associated with a specific task - * - * @tparam Type of task - * @param The specified task - * @return A pair of the task time and the number of calls so far - */ - template - ::std::pair get_task_stats(const task_type& t) - { - const ::std::string& task_name = t.get_symbol(); - size_t data_footprint = get_data_footprint(t); - - ::std::pair<::std::string, size_t> key(task_name, data_footprint); - auto it = statistics.find(key); - if (it != statistics.end()) - { - const statistic& s = it->second; - return {s.get_mean(), s.get_num_calls()}; - } - - // If we do not have the task in the map, this means we have to be calibrating online. - // A missing task implies an incomplete stats file was provided - EXPECT(is_calibrating(), "Task '", task_name, "' with size ", data_footprint, " not provided in stats file."); - - return {0.0, 0}; - } - -private: - ::std::string calibration_file; - bool calibrating = false; - - template - size_t get_data_footprint(const task_type& t) const - { - size_t data_footprint = 0; - const auto deps = t.get_task_deps(); - - for (auto it = deps.begin(); it < deps.end(); it++) - { - size_t new_size = it->get_data_footprint(); - data_footprint += new_size; - assert(data_footprint >= new_size); // Check for overflow - } - - return data_footprint; - } - - void write_stats() const - { - ::std::ofstream file(calibration_file); - if (!file) - { - ::std::cerr << "Failed to write to calibration file '" << calibration_file << "'.\n"; - return; - } - - file << "task,size,num_calls,mean,stddev\n"; - for (const auto& [key, value] : statistics) - { - double stddev = value.get_stddev(); - file << key.first << "," << key.second << "," << value.get_num_calls() << "," << value.get_mean() << "," << stddev - << '\n'; - } - - file.close(); - if (file.rdstate() & ::std::ifstream::failbit) - { - ::std::cerr << "ERROR: Closing calibration file failed.\n"; - } - } - - statistics_map_t statistics; -}; -} // namespace cuda::experimental::stf::reserved diff --git a/cudax/include/cuda/experimental/__stf/places/data_place_impl.cuh b/cudax/include/cuda/experimental/__stf/places/data_place_impl.cuh index e320ff8d8a9..8d24b41589a 100644 --- a/cudax/include/cuda/experimental/__stf/places/data_place_impl.cuh +++ b/cudax/include/cuda/experimental/__stf/places/data_place_impl.cuh @@ -13,7 +13,7 @@ * @brief Concrete implementations of data_place_interface * * This file contains implementations for standard data place types: - * host, managed, device, invalid, affine, and device_auto. + * host, managed, device, invalid, and affine. */ #pragma once @@ -358,55 +358,4 @@ public: return false; } }; - -/** - * @brief Implementation for device_auto data place (auto-select device) - */ -class data_place_device_auto final : public data_place_interface -{ -public: - bool is_resolved() const override - { - return false; - } - - int get_device_ordinal() const override - { - return data_place_interface::device_auto; - } - - ::std::string to_string() const override - { - return "auto"; - } - - size_t hash() const override - { - return ::std::hash()(data_place_interface::device_auto); - } - - int cmp(const data_place_interface& other) const override - { - if (typeid(*this) != typeid(other)) - { - return typeid(*this).before(typeid(other)) ? -1 : 1; - } - return 0; - } - - void* allocate(::std::ptrdiff_t, cudaStream_t) const override - { - throw ::std::logic_error("Cannot allocate from device_auto data_place directly"); - } - - void deallocate(void*, size_t, cudaStream_t) const override - { - throw ::std::logic_error("Cannot deallocate from device_auto data_place directly"); - } - - bool allocation_is_stream_ordered() const override - { - return true; - } -}; } // end namespace cuda::experimental::stf diff --git a/cudax/include/cuda/experimental/__stf/places/data_place_interface.cuh b/cudax/include/cuda/experimental/__stf/places/data_place_interface.cuh index 9586a037221..62ee9a58950 100644 --- a/cudax/include/cuda/experimental/__stf/places/data_place_interface.cuh +++ b/cudax/include/cuda/experimental/__stf/places/data_place_interface.cuh @@ -72,12 +72,11 @@ public: */ enum ord : int { - invalid = ::std::numeric_limits::min(), - composite = -5, - device_auto = -4, - affine = -3, - managed = -2, - host = -1, + invalid = ::std::numeric_limits::min(), + composite = -5, + affine = -3, + managed = -2, + host = -1, }; // === Core properties === @@ -88,7 +87,7 @@ public: * Returns true for places that represent a concrete memory target: * host, managed, device(N), composite, green_ctx, etc. * Returns false for abstract/deferred places that need further - * resolution: invalid, affine, device_auto. + * resolution: invalid, affine. */ virtual bool is_resolved() const = 0; @@ -100,7 +99,6 @@ public: * - data_place_ordinals::host (-1) for host * - data_place_ordinals::managed (-2) for managed * - data_place_ordinals::affine (-3) for affine - * - data_place_ordinals::device_auto (-4) for device_auto * - data_place_ordinals::composite (-5) for composite * - data_place_ordinals::invalid for invalid */ diff --git a/cudax/include/cuda/experimental/__stf/places/places.cuh b/cudax/include/cuda/experimental/__stf/places/places.cuh index 5d677a74d69..d4211647b63 100644 --- a/cudax/include/cuda/experimental/__stf/places/places.cuh +++ b/cudax/include/cuda/experimental/__stf/places/places.cuh @@ -122,15 +122,6 @@ public: return data_place(make_static_instance()); } - /** - * @brief Constant representing a placeholder that lets the library automatically select a GPU device as the - * `data_place`. - */ - static data_place device_auto() - { - return data_place(make_static_instance()); - } - /** @brief Data is placed on device with index dev_id. */ static data_place device(int dev_id = 0) { @@ -240,12 +231,6 @@ public: return typeid(ref) == typeid(data_place_device); } - bool is_device_auto() const - { - const auto& ref = *pimpl_; - return typeid(ref) == typeid(data_place_device_auto); - } - bool is_resolved() const { return pimpl_->is_resolved(); @@ -673,7 +658,6 @@ public: * for example exec_place::host or exec_place::device(4). */ static exec_place host(); - static exec_place device_auto(); static exec_place device(int devid); @@ -998,47 +982,6 @@ inline exec_place exec_place::host() return exec_place(make_static_instance()); } -// Implementation for device_auto placeholder -class exec_place_device_auto_impl : public exec_place::impl -{ -public: - exec_place_device_auto_impl() - : exec_place::impl(data_place::device_auto()) - {} - - exec_place activate(size_t) const override - { - throw ::std::logic_error("activate() called on device_auto exec_place - should be resolved first"); - } - - void deactivate(const exec_place&, size_t) const override - { - throw ::std::logic_error("deactivate() called on device_auto exec_place - should be resolved first"); - } - - bool is_device() const override - { - return true; - } - - ::std::shared_ptr get_place(size_t idx) override - { - _CCCL_ASSERT(idx == 0, "Index out of bounds for device_auto exec_place"); - // Static instance - use no-op deleter instead of shared_from_this() - return ::std::shared_ptr(this, [](impl*) {}); - } - - ::std::string to_string() const override - { - return "device_auto"; - } -}; - -inline exec_place exec_place::device_auto() -{ - return make_static_instance(); -} - UNITTEST("exec_place::host operator->*") { bool witness = false; @@ -1355,7 +1298,7 @@ inline exec_place data_place::affine_exec_place() const return exec_place(::std::static_pointer_cast(custom_impl)); } - // For invalid, affine, device_auto - throw + // For invalid, affine - throw throw ::std::logic_error("affine_exec_place() not meaningful for data_place type with ordinal " + ::std::to_string(pimpl_->get_device_ordinal())); } diff --git a/cudax/include/cuda/experimental/__stf/stream/stream_ctx.cuh b/cudax/include/cuda/experimental/__stf/stream/stream_ctx.cuh index 8a771a930d7..dfae92166ea 100644 --- a/cudax/include/cuda/experimental/__stf/stream/stream_ctx.cuh +++ b/cudax/include/cuda/experimental/__stf/stream/stream_ctx.cuh @@ -33,7 +33,6 @@ #include #include #include -#include #include // for unit test! #include // For implicit logical_data_untyped constructors #include @@ -178,40 +177,9 @@ public: template stream_task task(exec_place e_place, task_dep... deps) { - EXPECT(state().deferred_tasks.empty(), "Mixing deferred and immediate tasks is not supported yet."); return stream_task(*this, mv(e_place), mv(deps)...); } - template - deferred_stream_task deferred_task(exec_place e_place, task_dep... deps) - { - auto result = deferred_stream_task(*this, mv(e_place), mv(deps)...); - - int id = result.get_mapping_id(); - state().deferred_tasks.push_back(id); - state().task_map.emplace(id, result); - - return result; - } - - template - deferred_stream_task deferred_task(task_dep... deps) - { - return deferred_task(exec_place::current_device(), mv(deps)...); - } - - template - auto deferred_host_launch(task_dep... deps) - { - auto result = deferred_host_launch_scope(this, mv(deps)...); - int id = result.get_mapping_id(); - - state().deferred_tasks.push_back(id); - state().task_map.emplace(id, result); - - return result; - } - cudaStream_t fence() { const auto& user_dstream = state().user_dstream; @@ -231,326 +199,34 @@ public: return dstream.stream; } - /* - * host_launch : launch a "kernel" in a callback - */ - - template - class deferred_parallel_for_scope : public deferred_stream_task<> - { - struct payload_t : public deferred_stream_task<>::payload_t - { - payload_t(stream_ctx& ctx, exec_place e_place, shape_t shape, task_dep... deps) - : task(ctx, mv(e_place), mv(shape), mv(deps)...) - {} - - reserved::parallel_for_scope task; - ::std::function&)> todo; - - void set_symbol(::std::string s) override - { - task.set_symbol(mv(s)); - } - - const ::std::string& get_symbol() const override - { - return task.get_symbol(); - } - - int get_mapping_id() const override - { - return task.get_mapping_id(); - } - - void run() override - { - todo(task); - } - - void populate_deps_scheduling_info() override - { - // Error checking copied from acquire() in acquire_release() - - int index = 0; - const auto& deps = get_task_deps(); - for (const auto& dep : deps) - { - if (!dep.get_data().is_initialized()) - { - fprintf(stderr, "Error: dependency number %d is an uninitialized logical data.\n", index); - abort(); - } - dep.set_symbol(dep.get_data().get_symbol()); - dep.set_data_footprint(dep.get_data().get_data_interface().data_footprint()); - index++; - } - } - - const task_dep_vector_untyped& get_task_deps() const override - { - return task.get_task_deps(); - } - - void set_exec_place(exec_place e_place) override - { - task.set_exec_place(mv(e_place)); - } - }; - - payload_t& my_payload() const - { - // Safe to do the cast because we've set the pointer earlier ourselves - return *static_cast(payload.get()); - } - - public: - deferred_parallel_for_scope(stream_ctx& ctx, exec_place e_place, shape_t shape, task_dep... deps) - : deferred_stream_task<>(::std::make_shared(ctx, mv(e_place), mv(shape), mv(deps)...)) - {} - - ///@{ - /** - * @name Set the symbol of the task. This is used for profiling and debugging. - * - * @param s - * @return deferred_parallel_for_scope& - */ - deferred_parallel_for_scope& set_symbol(::std::string s) & - { - payload->set_symbol(mv(s)); - return *this; - } - - deferred_parallel_for_scope&& set_symbol(::std::string s) && - { - set_symbol(mv(s)); - return mv(*this); - } - ///@} - - void populate_deps_scheduling_info() - { - payload->populate_deps_scheduling_info(); - } - - template - void operator->*(Fun fun) - { - my_payload().todo = [f = mv(fun)](reserved::parallel_for_scope& task) { - task->*f; - }; - } - }; - - template - class deferred_host_launch_scope : public deferred_stream_task<> - { - struct payload_t : public deferred_stream_task<>::payload_t - { - payload_t(stream_ctx& ctx, task_dep... deps) - : task(ctx, mv(deps)...) - {} - - reserved::host_launch_scope task; - ::std::function&)> todo; - - void set_symbol(::std::string s) override - { - task.set_symbol(s); - } - - const ::std::string& get_symbol() const override - { - return task.get_symbol(); - } - - int get_mapping_id() const override - { - return task.get_mapping_id(); - } - - void run() override - { - todo(task); - } - - void populate_deps_scheduling_info() override - { - int index = 0; - const auto& deps = get_task_deps(); - for (const auto& dep : deps) - { - if (!dep.get_data().is_initialized()) - { - fprintf(stderr, "Error: dependency number %d is an uninitialized logical data.\n", index); - abort(); - } - dep.set_symbol(dep.get_data().get_symbol()); - dep.set_data_footprint(dep.get_data().get_data_interface().data_footprint()); - index++; - } - } - - const task_dep_vector_untyped& get_task_deps() const override - { - return task.get_task_deps(); - } - - void set_exec_place(exec_place) override {} - }; - - payload_t& my_payload() const - { - // Safe to do the cast because we've set the pointer earlier ourselves - return *static_cast(payload.get()); - } - - public: - deferred_host_launch_scope(stream_ctx& ctx, task_dep... deps) - : deferred_stream_task<>(::std::make_shared(ctx, mv(deps)...)) - {} - - ///@{ - /** - * @name Set the symbol of the task. This is used for profiling and debugging. - * - * @param s - * @return deferred_host_launch_scope& - */ - deferred_host_launch_scope& set_symbol(::std::string s) & - { - payload->set_symbol(mv(s)); - return *this; - } - - deferred_host_launch_scope&& set_symbol(::std::string s) && - { - set_symbol(mv(s)); - return mv(*this); - } - ///@} - - void populate_deps_scheduling_info() - { - payload->populate_deps_scheduling_info(); - } - - template - void operator->*(Fun fun) - { - my_payload().todo = [f = mv(fun)](reserved::host_launch_scope& task) { - task->*f; - }; - } - }; + // no-op for stream_ctx, needed so that context (variant of stream_ctx/graph_ctx) can dispatch submit() + void submit() {} void finalize() { _CCCL_ASSERT(get_phase() < backend_ctx_untyped::phase::finalized, ""); auto& state = this->state(); - if (!state.submitted_stream) - { - // Wasn't submitted yet - submit(); - assert(state.submitted_stream); - } + + cudaStream_t submitted_stream = fence(); + + // Write-back data and erase automatically created data instances + state.erase_all_logical_data(); + state.detach_allocators(*this); // Make sure we release resources attached to this context - state.release_ctx_resources(state.submitted_stream); + state.release_ctx_resources(submitted_stream); if (state.blocking_finalize) { - cuda_safe_call(cudaStreamSynchronize(state.submitted_stream)); + cuda_safe_call(cudaStreamSynchronize(submitted_stream)); } state.cleanup(); set_phase(backend_ctx_untyped::phase::finalized); } - float get_submission_time_ms() const - { - assert(state().submitted_stream); - return state().submission_time; - } - - void submit() - { - auto& state = this->state(); - _CCCL_ASSERT(!state.submitted_stream, ""); - _CCCL_ASSERT(get_phase() < backend_ctx_untyped::phase::submitted, ""); - - cudaEvent_t startEvent = nullptr; - cudaEvent_t stopEvent = nullptr; - - ::std::unordered_map payloads; - if (reordering_tasks()) - { - build_task_graph(); - for (int id : state.deferred_tasks) - { - const auto& t = state.task_map.at(id); - payloads.emplace(id, t.get_reorderer_payload()); - } - reorder_tasks(state.deferred_tasks, payloads); - - for (auto& [id, payload] : payloads) - { - if (payload.device != -1) - { - state.task_map.at(id).set_exec_place(exec_place::device(payload.device)); - } - } - - cuda_safe_call(cudaSetDevice(0)); - cuda_safe_call(cudaStreamSynchronize(fence())); - cuda_safe_call(cudaEventCreate(&startEvent)); - cuda_safe_call(cudaEventCreate(&stopEvent)); - cuda_safe_call(cudaEventRecord(startEvent, fence())); - } - - for (int id : state.deferred_tasks) - { - auto& task = state.task_map.at(id); - task.run(); - } - - if (reordering_tasks()) - { - cuda_safe_call(cudaSetDevice(0)); - cuda_safe_call(cudaEventRecord(stopEvent, fence())); - cuda_safe_call(cudaEventSynchronize(stopEvent)); - cuda_safe_call(cudaEventElapsedTime(&state.submission_time, startEvent, stopEvent)); - } - - // Write-back data and erase automatically created data instances - state.erase_all_logical_data(); - state.detach_allocators(*this); - - state.submitted_stream = fence(); - assert(state.submitted_stream != nullptr); - - set_phase(backend_ctx_untyped::phase::submitted); - } - // no-op : so that we can use the same code with stream_ctx and graph_ctx void change_stage() {} - template - auto deferred_parallel_for(exec_place e_place, S shape, task_dep... deps) - { - auto result = deferred_parallel_for_scope(*this, mv(e_place), mv(shape), mv(deps)...); - int id = result.get_mapping_id(); - state().deferred_tasks.push_back(id); - state().task_map.emplace(id, result); - - return result; - } - - template - auto deferred_parallel_for(S shape, task_dep... deps) - { - return deferred_parallel_for(exec_place::current_device(), mv(shape), mv(deps)...); - } - template auto wait(cuda::experimental::stf::logical_data& ldata) { @@ -578,10 +254,6 @@ private: void cleanup() { - // Reset this object - deferred_tasks.clear(); - task_map.clear(); - submitted_stream = nullptr; base::impl::cleanup(); } @@ -609,11 +281,6 @@ private: return true; } - ::std::vector deferred_tasks; // vector of mapping_ids - ::std::unordered_map> task_map; // maps from a mapping_id to the deferred_task - cudaStream_t submitted_stream = nullptr; // stream used in submit - float submission_time = 0.0; - // If the context is attached to a user stream, we should use it for // finalize() or fence() ::std::optional user_dstream; @@ -631,91 +298,6 @@ private: { return dynamic_cast(get_state()); } - - /// @brief Build the task graph by populating the predecessor and successor lists. - /// The logic here is copied from acquire_release.h notify_access(), although this doesn't handle redux at the - /// moment - void build_task_graph() - { - auto& state = this->state(); - - // Maps from a logical data to its last writer. The int is the mapping_id - ::std::unordered_map<::std::string, ::std::deque> current_readers; - ::std::unordered_map<::std::string, int> current_writer, previous_writer; - - for (int id : state.deferred_tasks) - { - auto& t = state.task_map.at(id); - assert(id == t.get_mapping_id()); - - for (const auto& dep : t.get_task_deps()) - { - const access_mode mode = dep.get_access_mode(); - - const logical_data_untyped data = dep.get_data(); - const auto& symbol = data.get_symbol(); - const auto it = current_writer.find(symbol); - const bool write = mode == access_mode::rw || mode == access_mode::write; - - if (write) - { - if (it == current_writer.end()) - { // WAR - if (auto readers_it = current_readers.find(symbol); readers_it != current_readers.end()) - { - for (auto& readers_queue = readers_it->second; !readers_queue.empty();) - { - const int reader_id = readers_queue.back(); - readers_queue.pop_back(); - - auto& reader_task = state.task_map.at(reader_id); - t.add_predecessor(reader_id); - reader_task.add_successor(id); - } - } - } - else - { // WAW - const int writer_id = it->second; - auto& writer_task = state.task_map.at(writer_id); - - t.add_predecessor(writer_id); - writer_task.add_successor(id); - - previous_writer[symbol] = writer_id; - } - current_writer[symbol] = id; - } - else - { - current_readers[symbol].emplace_back(id); - if (it == current_writer.end()) - { // RAR - - auto previous_writer_it = previous_writer.find(symbol); - if (previous_writer_it != previous_writer.end()) - { - const int previous_writer_id = previous_writer_it->second; - auto& previous_writer_task = state.task_map.at(previous_writer_id); - - t.add_predecessor(previous_writer_id); - previous_writer_task.add_successor(id); - } - } - else - { // RAW - const int writer_id = it->second; - auto& writer_task = state.task_map.at(writer_id); - previous_writer[symbol] = writer_id; - current_writer.erase(symbol); - - t.add_predecessor(writer_id); - writer_task.add_successor(id); - } - } - } - } - } }; #ifdef UNITTESTED_FILE @@ -795,7 +377,6 @@ UNITTEST("copyable stream_ctx") auto t = ctx.task(); auto t2 = ctx2.task(); - ctx2.submit(); ctx2.finalize(); }; @@ -806,7 +387,6 @@ UNITTEST("movable stream_ctx") stream_ctx ctx2 = mv(ctx); auto t2 = ctx2.task(); - ctx2.submit(); ctx2.finalize(); }; diff --git a/cudax/include/cuda/experimental/__stf/stream/stream_task.cuh b/cudax/include/cuda/experimental/__stf/stream/stream_task.cuh index f4dc147a02f..76a13d1e022 100644 --- a/cudax/include/cuda/experimental/__stf/stream/stream_task.cuh +++ b/cudax/include/cuda/experimental/__stf/stream/stream_task.cuh @@ -328,52 +328,12 @@ public: } } - void populate_deps_scheduling_info() const - { - // Error checking copied from acquire() in acquire_release() - - int index = 0; - const auto& deps = get_task_deps(); - for (const auto& dep : deps) - { - if (!dep.get_data().is_initialized()) - { - fprintf(stderr, "Error: dependency number %d is an uninitialized logical data.\n", index); - abort(); - } - dep.set_symbol(dep.get_data().get_symbol()); - dep.set_data_footprint(dep.get_data().get_data_interface().data_footprint()); - index++; - } - } - /** - * @brief Use the scheduler to assign a device to this task - * - * @return returns true if the task's time needs to be recorded + * @brief Determine if the task's time needs to be recorded (for DOT visualization) */ - bool schedule_task() + bool should_record_time() { - reserved::dot& dot = reserved::dot::instance(); - auto& statistics = reserved::task_statistics::instance(); - - const bool is_auto = get_exec_place().affine_data_place().is_device_auto(); - bool calibrate = false; - - // We need to know the data footprint if scheduling or calibrating tasks - if (is_auto || statistics.is_calibrating()) - { - populate_deps_scheduling_info(); - } - - if (is_auto) - { - auto [place, needs_calibration] = ctx.schedule_task(*this); - set_exec_place(place); - calibrate = needs_calibration; - } - - return dot.is_timing() || (calibrate && statistics.is_calibrating()); + return reserved::dot::instance().is_timing(); } private: @@ -524,25 +484,17 @@ public: template auto operator->*(Fun&& fun) { - // Apply function to the stream (in the first position) and the data tuple - auto& dot = ctx.get_dot(); - auto& statistics = reserved::task_statistics::instance(); + auto& dot = ctx.get_dot(); cudaEvent_t start_event, end_event; - bool record_time = schedule_task(); - - if (statistics.is_calibrating_to_file()) - { - record_time = true; - } + bool record_time = should_record_time(); nvtx_range nr(get_symbol().c_str()); start(); if (record_time) { - // Events must be created here to avoid issues with multi-gpu cuda_safe_call(cudaEventCreate(&start_event)); cuda_safe_call(cudaEventCreate(&end_event)); cuda_safe_call(cudaEventRecord(start_event, get_stream())); @@ -564,11 +516,6 @@ public: { dot->template add_vertex_timing(*this, milliseconds); } - - if (statistics.is_calibrating()) - { - statistics.log_task_time(*this, milliseconds); - } } clear(); @@ -616,272 +563,4 @@ private: }); } }; - -/* - * @brief Deferred tasks are tasks that are not executed immediately, but rather upon `ctx.submit()`. - */ -template -class deferred_stream_task; - -#ifndef _CCCL_DOXYGEN_INVOKED // doxygen has issues with this code -/* - * Base of all deferred tasks. Stores the needed information for typed deferred tasks to run (see below). - */ -template <> -class deferred_stream_task<> -{ -protected: - // Type stored - struct payload_t - { - virtual ~payload_t() = default; - virtual void set_symbol(::std::string s) = 0; - virtual const ::std::string& get_symbol() const = 0; - virtual int get_mapping_id() const = 0; - virtual void run() = 0; - virtual void populate_deps_scheduling_info() = 0; - virtual const task_dep_vector_untyped& get_task_deps() const = 0; - virtual void set_exec_place(exec_place e_place) = 0; - - void add_successor(int succ) - { - successors.insert(succ); - } - void add_predecessor(int pred) - { - predecessors.insert(pred); - } - - const ::std::unordered_set& get_successors() const - { - return successors; - } - const ::std::unordered_set& get_predecessors() const - { - return predecessors; - } - - virtual void set_cost(double c) - { - assert(c >= 0.0); - cost = c; - } - virtual double get_cost() const - { - assert(cost >= 0.0); - return cost; - } - - private: - // Sets of mapping ids - ::std::unordered_set predecessors; - ::std::unordered_set successors; - double cost = -1.0; - }; - - ::std::shared_ptr payload; - - deferred_stream_task(::std::shared_ptr payload) - : payload(mv(payload)) - {} - -public: - void run() - { - assert(payload); - payload->run(); - } - - const task_dep_vector_untyped& get_task_deps() const - { - assert(payload); - return payload->get_task_deps(); - } - - void add_successor(int succ) - { - assert(payload); - payload->add_successor(succ); - } - - void add_predecessor(int pred) - { - assert(payload); - payload->add_predecessor(pred); - } - - const ::std::unordered_set& get_successors() const - { - assert(payload); - return payload->get_successors(); - } - - const ::std::unordered_set& get_predecessors() const - { - assert(payload); - return payload->get_predecessors(); - } - - const ::std::string& get_symbol() const - { - assert(payload); - return payload->get_symbol(); - } - - void set_symbol(::std::string s) const - { - return payload->set_symbol(mv(s)); - } - - int get_mapping_id() const - { - assert(payload); - return payload->get_mapping_id(); - } - - double get_cost() const - { - assert(payload); - return payload->get_cost(); - } - - void set_cost(double cost) - { - assert(payload); - payload->set_cost(cost); - } - - auto get_reorderer_payload() const - { - assert(payload); - payload->populate_deps_scheduling_info(); - return reserved::reorderer_payload( - payload->get_symbol(), - payload->get_mapping_id(), - payload->get_successors(), - payload->get_predecessors(), - payload->get_task_deps()); - } - - void set_exec_place(exec_place e_place) - { - assert(payload); - payload->set_exec_place(e_place); - } -}; - -/** - * @brief Deferred tasks are tasks that are not executed immediately, but rather upon `ctx.submit()`. This allows - * the library to perform optimizations on the task graph before it is executed. - * - * @tparam Data The dependencies of the task - */ -template -class deferred_stream_task : public deferred_stream_task<> -{ - struct payload_t : public deferred_stream_task<>::payload_t - { - template - payload_t(backend_ctx_untyped ctx, exec_place e_place, task_dep... deps) - : task(mv(ctx), mv(e_place)) - { - task.add_deps(mv(deps)...); - } - - // Untyped task information. Will be needed later for launching the task. - stream_task<> task; - // Function that launches the task. - ::std::function&)> todo; - // More data could go here - - void set_symbol(::std::string s) override - { - task.set_symbol(mv(s)); - } - - const ::std::string& get_symbol() const override - { - return task.get_symbol(); - } - - int get_mapping_id() const override - { - return task.get_mapping_id(); - } - - void run() override - { - todo(task); - } - - void populate_deps_scheduling_info() override - { - task.populate_deps_scheduling_info(); - } - - const task_dep_vector_untyped& get_task_deps() const override - { - return task.get_task_deps(); - } - - void set_exec_place(exec_place e_place) override - { - task.set_exec_place(e_place); - } - }; - - payload_t& my_payload() const - { - // Safe to do the cast because we've set the pointer earlier ourselves - return *static_cast(payload.get()); - } - -public: - /** - * @brief Construct a new deferred stream task object from a context, execution place, and dependencies. - * - * @param ctx the parent context - * @param e_place the place where the task will execute - * @param deps task dependencies - */ - deferred_stream_task(backend_ctx_untyped ctx, exec_place e_place, task_dep... deps) - : deferred_stream_task<>(::std::make_shared(mv(ctx), mv(e_place), mv(deps)...)) - {} - - ///@{ - /** - * @name Set the symbol of the task. This is used for profiling and debugging. - * - * @param s - * @return deferred_stream_task& - */ - deferred_stream_task& set_symbol(::std::string s) & - { - payload->set_symbol(mv(s)); - return *this; - } - - deferred_stream_task&& set_symbol(::std::string s) && - { - set_symbol(mv(s)); - return mv(*this); - } - ///@} - - void populate_deps_scheduling_info() - { - payload->populate_deps_scheduling_info(); - } - - template - void operator->*(Fun fun) - { - my_payload().todo = [f = mv(fun)](stream_task<>& untyped_task) { - // Here we have full type info; we can downcast to typed stream_task - auto& task = static_cast&>(untyped_task); - task.operator->*(f); - }; - } -}; -#endif // _CCCL_DOXYGEN_INVOKED } // namespace cuda::experimental::stf