Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ target_link_libraries(cugraph
rmm::rmm
raft::raft
$<BUILD_LOCAL_INTERFACE:CUDA::toolkit>
cuda
PRIVATE
${COMPILED_RAFT_LIB}
cuco::cuco
Expand Down
3 changes: 3 additions & 0 deletions cpp/cmake/thirdparty/get_cccl.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
# This function finds CCCL and sets any additional necessary environment variables.
function(find_and_configure_cccl)
include(${rapids-cmake-dir}/cpm/cccl.cmake)
include(${rapids-cmake-dir}/cpm/package_override.cmake)
rapids_cpm_package_override("${CMAKE_CURRENT_FUNCTION_LIST_DIR}/cccl_override.json")
set(CCCL_ENABLE_UNSTABLE ON)
rapids_cpm_cccl(BUILD_EXPORT_SET cugraph-exports INSTALL_EXPORT_SET cugraph-exports)
endfunction()

Expand Down
44 changes: 36 additions & 8 deletions cpp/src/prims/detail/per_v_transform_reduce_e.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@
#include <type_traits>
#include <utility>

#include <cuda/experimental/stf.cuh>
#include <raft/core/resource/custom_resource.hpp>
using namespace cuda::experimental::stf;

namespace cugraph {

namespace detail {
Expand Down Expand Up @@ -1161,6 +1165,14 @@ void per_v_transform_reduce_e_edge_partition(
std::optional<raft::host_span<size_t const>> key_segment_offsets,
std::optional<raft::host_span<size_t const>> const& edge_partition_stream_pool_indices)
{
async_resources_handle& cudastf_handle = *raft::resource::get_custom_resource<async_resources_handle>(handle);
stream_ctx cudastf_ctx(handle.get_stream(), cudastf_handle);
token output_tokens[4];
for (size_t i = 0; i < 4; i++)
{
output_tokens[i] = cudastf_ctx.token();
}

constexpr bool use_input_key = !std::is_same_v<OptionalKeyIterator, void*>;

using vertex_t = typename GraphViewType::vertex_type;
Expand All @@ -1184,10 +1196,12 @@ void per_v_transform_reduce_e_edge_partition(

if constexpr (update_major && !use_input_key) { // this is necessary as we don't visit
// every vertex in the hypersparse segment
thrust::fill(rmm::exec_policy_nosync(exec_stream),
cudastf_ctx.task(output_tokens[3].write())->*[=](cudaStream_t stream) {
thrust::fill(rmm::exec_policy_nosync(stream),
output_buffer + (*key_segment_offsets)[3],
output_buffer + (*key_segment_offsets)[4],
major_init);
};
}

auto segment_size = use_input_key
Expand All @@ -1197,8 +1211,9 @@ void per_v_transform_reduce_e_edge_partition(
raft::grid_1d_thread_t update_grid(segment_size,
detail::per_v_transform_reduce_e_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);
size_t token_idx = 0;
auto segment_output_buffer = output_buffer;
if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[3]; }
if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[3]; token_idx +=3; }
auto segment_key_first = edge_partition_key_first;
auto segment_key_last = edge_partition_key_last;
if constexpr (use_input_key) {
Expand All @@ -1209,8 +1224,9 @@ void per_v_transform_reduce_e_edge_partition(
assert(segment_key_first == nullptr);
assert(segment_key_last == nullptr);
}
cudastf_ctx.task(output_tokens[token_idx].rw())->*[=](cudaStream_t stream) {
detail::per_v_transform_reduce_e_hypersparse<update_major, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, exec_stream>>>(
<<<update_grid.num_blocks, update_grid.block_size, 0, stream>>>(
edge_partition,
segment_key_first,
segment_key_last,
Expand All @@ -1223,6 +1239,7 @@ void per_v_transform_reduce_e_edge_partition(
major_init,
reduce_op,
pred_op);
};
}
}
if ((*key_segment_offsets)[3] - (*key_segment_offsets)[2]) {
Expand All @@ -1233,8 +1250,9 @@ void per_v_transform_reduce_e_edge_partition(
raft::grid_1d_thread_t update_grid((*key_segment_offsets)[3] - (*key_segment_offsets)[2],
detail::per_v_transform_reduce_e_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);
size_t token_idx = 0;
auto segment_output_buffer = output_buffer;
if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[2]; }
if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[2]; token_idx += 2; }
std::optional<segment_key_iterator_t>
segment_key_first{}; // std::optional as thrust::transform_iterator's default constructor
// is a deleted function, segment_key_first should always have a value
Expand All @@ -1244,8 +1262,9 @@ void per_v_transform_reduce_e_edge_partition(
segment_key_first = thrust::make_counting_iterator(edge_partition.major_range_first());
}
*segment_key_first += (*key_segment_offsets)[2];
cudastf_ctx.task(output_tokens[token_idx].rw())->*[=](cudaStream_t stream) {
detail::per_v_transform_reduce_e_low_degree<update_major, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, exec_stream>>>(
<<<update_grid.num_blocks, update_grid.block_size, 0, stream>>>(
edge_partition,
*segment_key_first,
*segment_key_first + ((*key_segment_offsets)[3] - (*key_segment_offsets)[2]),
Expand All @@ -1258,6 +1277,7 @@ void per_v_transform_reduce_e_edge_partition(
major_init,
reduce_op,
pred_op);
};
}
if ((*key_segment_offsets)[2] - (*key_segment_offsets)[1] > 0) {
auto exec_stream = edge_partition_stream_pool_indices
Expand All @@ -1267,8 +1287,10 @@ void per_v_transform_reduce_e_edge_partition(
raft::grid_1d_warp_t update_grid((*key_segment_offsets)[2] - (*key_segment_offsets)[1],
detail::per_v_transform_reduce_e_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);
size_t token_idx = 0;
auto segment_output_buffer = output_buffer;
if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[1]; }

if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[1]; token_idx += 1;}
std::optional<segment_key_iterator_t>
segment_key_first{}; // std::optional as thrust::transform_iterator's default constructor
// is a deleted function, segment_key_first should always have a value
Expand All @@ -1278,8 +1300,9 @@ void per_v_transform_reduce_e_edge_partition(
segment_key_first = thrust::make_counting_iterator(edge_partition.major_range_first());
}
*segment_key_first += (*key_segment_offsets)[1];
cudastf_ctx.task(output_tokens[token_idx].rw())->*[=](cudaStream_t stream) {
detail::per_v_transform_reduce_e_mid_degree<update_major, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, exec_stream>>>(
<<<update_grid.num_blocks, update_grid.block_size, 0, stream>>>(
edge_partition,
*segment_key_first,
*segment_key_first + ((*key_segment_offsets)[2] - (*key_segment_offsets)[1]),
Expand All @@ -1293,6 +1316,7 @@ void per_v_transform_reduce_e_edge_partition(
major_identity_element,
reduce_op,
pred_op);
};
}
if ((*key_segment_offsets)[1] > 0) {
auto exec_stream = edge_partition_stream_pool_indices
Expand All @@ -1313,8 +1337,9 @@ void per_v_transform_reduce_e_edge_partition(
} else {
segment_key_first = thrust::make_counting_iterator(edge_partition.major_range_first());
}
cudastf_ctx.task(output_tokens[0].rw())->*[=](cudaStream_t stream) {
detail::per_v_transform_reduce_e_high_degree<update_major, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, exec_stream>>>(
<<<update_grid.num_blocks, update_grid.block_size, 0, stream>>>(
edge_partition,
*segment_key_first,
*segment_key_first + (*key_segment_offsets)[1],
Expand All @@ -1328,7 +1353,10 @@ void per_v_transform_reduce_e_edge_partition(
major_identity_element,
reduce_op,
pred_op);
};
}

cudastf_ctx.finalize();
} else {
auto exec_stream = edge_partition_stream_pool_indices
? handle.get_stream_from_stream_pool(
Expand Down
45 changes: 39 additions & 6 deletions cpp/src/prims/detail/transform_v_frontier_e.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
#include <cugraph/utilities/error.hpp>
#include <cugraph/utilities/thrust_tuple_utils.hpp>

#include <cuda/experimental/stf.cuh>
#include <raft/core/resource/custom_resource.hpp>

#include <raft/core/handle.hpp>

#include <cuda/std/iterator>
Expand All @@ -37,6 +40,8 @@

#include <type_traits>

using namespace cuda::experimental::stf;

namespace cugraph {

namespace detail {
Expand Down Expand Up @@ -415,6 +420,9 @@ auto transform_v_frontier_e(raft::handle_t const& handle,

auto edge_mask_view = graph_view.edge_mask_view();

async_resources_handle& cudastf_handle = *raft::resource::get_custom_resource<async_resources_handle>(handle);
stream_ctx cudastf_ctx(handle.get_stream(), cudastf_handle);

// 1. update aggregate_local_frontier_local_degree_offsets

auto aggregate_local_frontier_local_degree_offsets =
Expand Down Expand Up @@ -508,9 +516,17 @@ auto transform_v_frontier_e(raft::handle_t const& handle,
}
auto edge_partition_e_value_input = edge_partition_e_input_device_view_t(edge_value_input, i);


// CUDASTF logical data buffer for transform reduce phase
std::vector<token> l_tv_buffers(5);
for (size_t segment_i = 0; segment_i < 5; segment_i++) {
l_tv_buffers[segment_i] = cudastf_ctx.token();
}

auto segment_offsets = graph_view.local_edge_partition_segment_offsets(i);
if (segment_offsets) {
auto [edge_partition_key_indices, edge_partition_v_frontier_partition_offsets] =
//auto [edge_partition_key_indices, edge_partition_v_frontier_partition_offsets] =
auto res_partition_v_frontier =
partition_v_frontier(
handle,
edge_partition_frontier_major_first,
Expand All @@ -520,6 +536,10 @@ auto transform_v_frontier_e(raft::handle_t const& handle,
edge_partition.major_range_first() + (*segment_offsets)[2],
edge_partition.major_range_first() + (*segment_offsets)[3]});

// We cannot capture structured binding before C++20 so we create these variables manually
auto& edge_partition_key_indices = ::std::get<0>(res_partition_v_frontier);
auto& edge_partition_v_frontier_partition_offsets = ::std::get<1>(res_partition_v_frontier);

// FIXME: we may further improve performance by 1) concurrently running kernels on different
// segments; 2) individually tuning block sizes for different segments; and 3) adding one
// more segment for very high degree vertices and running segmented reduction
Expand All @@ -529,8 +549,10 @@ auto transform_v_frontier_e(raft::handle_t const& handle,
raft::grid_1d_block_t update_grid(high_size,
detail::transform_v_frontier_e_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);

cudastf_ctx.task(l_tv_buffers[0].write())->*[&](cudaStream_t stream) {
detail::transform_v_frontier_e_high_degree<GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, handle.get_stream()>>>(
<<<update_grid.num_blocks, update_grid.block_size, 0, stream>>>(
edge_partition,
edge_partition_frontier_key_first,
edge_partition_key_indices.begin() + edge_partition_v_frontier_partition_offsets[0],
Expand All @@ -542,15 +564,17 @@ auto transform_v_frontier_e(raft::handle_t const& handle,
edge_partition_frontier_local_degree_offsets,
e_op,
get_dataframe_buffer_begin(aggregate_value_buffer));
};
}
auto mid_size = edge_partition_v_frontier_partition_offsets[2] -
edge_partition_v_frontier_partition_offsets[1];
if (mid_size > 0) {
raft::grid_1d_warp_t update_grid(mid_size,
detail::transform_v_frontier_e_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);
cudastf_ctx.task(l_tv_buffers[1].write())->*[&](cudaStream_t stream) {
detail::transform_v_frontier_e_mid_degree<GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, handle.get_stream()>>>(
<<<update_grid.num_blocks, update_grid.block_size, 0, stream>>>(
edge_partition,
edge_partition_frontier_key_first,
edge_partition_key_indices.begin() + edge_partition_v_frontier_partition_offsets[1],
Expand All @@ -562,15 +586,17 @@ auto transform_v_frontier_e(raft::handle_t const& handle,
edge_partition_frontier_local_degree_offsets,
e_op,
get_dataframe_buffer_begin(aggregate_value_buffer));
};
}
auto low_size = edge_partition_v_frontier_partition_offsets[3] -
edge_partition_v_frontier_partition_offsets[2];
if (low_size > 0) {
raft::grid_1d_thread_t update_grid(low_size,
detail::transform_v_frontier_e_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);
cudastf_ctx.task(l_tv_buffers[2].write())->*[&](cudaStream_t stream) {
detail::transform_v_frontier_e_hypersparse_or_low_degree<false, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, handle.get_stream()>>>(
<<<update_grid.num_blocks, update_grid.block_size, 0, stream>>>(
edge_partition,
edge_partition_frontier_key_first,
edge_partition_key_indices.begin() + edge_partition_v_frontier_partition_offsets[2],
Expand All @@ -582,15 +608,17 @@ auto transform_v_frontier_e(raft::handle_t const& handle,
edge_partition_frontier_local_degree_offsets,
e_op,
get_dataframe_buffer_begin(aggregate_value_buffer));
};
}
auto hypersparse_size = edge_partition_v_frontier_partition_offsets[4] -
edge_partition_v_frontier_partition_offsets[3];
if (hypersparse_size > 0) {
raft::grid_1d_thread_t update_grid(hypersparse_size,
detail::transform_v_frontier_e_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);
cudastf_ctx.task(l_tv_buffers[3].write())->*[&](cudaStream_t stream) {
detail::transform_v_frontier_e_hypersparse_or_low_degree<true, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, handle.get_stream()>>>(
<<<update_grid.num_blocks, update_grid.block_size, 0, stream>>>(
edge_partition,
edge_partition_frontier_key_first,
edge_partition_key_indices.begin() + edge_partition_v_frontier_partition_offsets[3],
Expand All @@ -602,15 +630,17 @@ auto transform_v_frontier_e(raft::handle_t const& handle,
edge_partition_frontier_local_degree_offsets,
e_op,
get_dataframe_buffer_begin(aggregate_value_buffer));
};
}
} else {
raft::grid_1d_thread_t update_grid(
(local_frontier_offsets[i + 1] - local_frontier_offsets[i]),
detail::transform_v_frontier_e_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);

cudastf_ctx.task(l_tv_buffers[4].write())->*[&,i](cudaStream_t stream) {
detail::transform_v_frontier_e_hypersparse_or_low_degree<false, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, handle.get_stream()>>>(
<<<update_grid.num_blocks, update_grid.block_size, 0, stream>>>(
edge_partition,
edge_partition_frontier_key_first,
thrust::make_counting_iterator(size_t{0}),
Expand All @@ -622,9 +652,12 @@ auto transform_v_frontier_e(raft::handle_t const& handle,
edge_partition_frontier_local_degree_offsets,
e_op,
get_dataframe_buffer_begin(aggregate_value_buffer));
};
}
}

cudastf_ctx.finalize();

return std::make_tuple(std::move(aggregate_value_buffer),
std::move(aggregate_local_frontier_local_degree_offsets));
}
Expand Down
Loading