Skip to content

Reduce device memory usage for CAGRA's graph optimization process (reverse graph creation) #832

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: branch-25.06
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 222 additions & 49 deletions cpp/src/neighbors/detail/cagra/graph_core.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
#include <memory>
#include <random>

#include <sys/mman.h>

namespace cuvs::neighbors::cagra::detail::graph {

// unnamed namespace to avoid multiple definition error
Expand Down Expand Up @@ -1313,7 +1315,20 @@ void optimize(
(double)num_full / graph_size * 100);
}

auto rev_graph = raft::make_host_matrix<IdxT, int64_t>(graph_size, output_graph_degree);
// Allocate memory for rev_graph using Transparent HugePage
constexpr size_t thp_size = 2 * 1024 * 1024;
size_t byte_size = sizeof(IdxT) * graph_size * output_graph_degree;
if (byte_size % thp_size) { byte_size += thp_size - (byte_size % thp_size); }
IdxT* rev_graph_ptr =
(IdxT*)mmap(NULL, byte_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (rev_graph_ptr == MAP_FAILED) {
perror("mmap");
exit(-1);
}
if (madvise(rev_graph_ptr, byte_size, MADV_HUGEPAGE) != 0) { perror("madvise"); }
memset(rev_graph_ptr, 0, byte_size);
auto rev_graph_view =
raft::make_host_matrix_view<IdxT, int64_t>(rev_graph_ptr, graph_size, output_graph_degree);
auto rev_graph_count = raft::make_host_vector<uint32_t, int64_t>(graph_size);

{
Expand All @@ -1324,63 +1339,216 @@ void optimize(
//
const double time_make_start = cur_time();

device_matrix_view_from_host<IdxT, int64_t> d_rev_graph(res, rev_graph.view());
RAFT_CUDA_TRY(cudaMemsetAsync(d_rev_graph.data_handle(),
0xff,
graph_size * output_graph_degree * sizeof(IdxT),
raft::resource::get_cuda_stream(res)));

auto d_rev_graph_count = raft::make_device_mdarray<uint32_t>(
res, large_tmp_mr, raft::make_extents<int64_t>(graph_size));
RAFT_CUDA_TRY(cudaMemsetAsync(d_rev_graph_count.data_handle(),
0x00,
graph_size * sizeof(uint32_t),
raft::resource::get_cuda_stream(res)));

auto dest_nodes = raft::make_host_vector<IdxT, int64_t>(graph_size);
auto d_dest_nodes =
raft::make_device_mdarray<IdxT>(res, large_tmp_mr, raft::make_extents<int64_t>(graph_size));
bool use_gpu = true;
try {
auto d_rev_graph =
raft::make_device_matrix<IdxT, int64_t>(res, graph_size, output_graph_degree);
} catch (std::bad_alloc& e) {
RAFT_LOG_DEBUG("Insufficient memory for making reverse graph on GPU");
use_gpu = false;
} catch (raft::logic_error& e) {
RAFT_LOG_DEBUG("Insufficient memory for making reverse graph on GPU (logic error)");
use_gpu = false;
}
if (use_gpu) {
raft::common::nvtx::range<cuvs::common::nvtx::domain::cuvs> block_scope(
"cagra::graph::optimize/reverse_GPU");
RAFT_LOG_INFO("# reverse_GPU"); // **** TO BE REMOVED ****

device_matrix_view_from_host<IdxT, int64_t> d_rev_graph(res, rev_graph_view);
RAFT_CUDA_TRY(cudaMemsetAsync(d_rev_graph.data_handle(),
0xff,
graph_size * output_graph_degree * sizeof(IdxT),
raft::resource::get_cuda_stream(res)));

auto d_rev_graph_count = raft::make_device_mdarray<uint32_t>(
res, large_tmp_mr, raft::make_extents<int64_t>(graph_size));
RAFT_CUDA_TRY(cudaMemsetAsync(d_rev_graph_count.data_handle(),
0x00,
graph_size * sizeof(uint32_t),
raft::resource::get_cuda_stream(res)));

auto dest_nodes = raft::make_host_vector<IdxT, int64_t>(graph_size);
auto d_dest_nodes =
raft::make_device_mdarray<IdxT>(res, large_tmp_mr, raft::make_extents<int64_t>(graph_size));

for (uint64_t k = 0; k < output_graph_degree; k++) {
for (uint64_t k = 0; k < output_graph_degree; k++) {
#pragma omp parallel for
for (uint64_t i = 0; i < graph_size; i++) {
// dest_nodes.data_handle()[i] = output_graph_ptr[k + (output_graph_degree * i)];
dest_nodes(i) = output_graph_ptr[k + (output_graph_degree * i)];
for (uint64_t i = 0; i < graph_size; i++) {
// dest_nodes.data_handle()[i] = output_graph_ptr[k + (output_graph_degree * i)];
dest_nodes(i) = output_graph_ptr[k + (output_graph_degree * i)];
}
raft::resource::sync_stream(res);

raft::copy(d_dest_nodes.data_handle(),
dest_nodes.data_handle(),
graph_size,
raft::resource::get_cuda_stream(res));

dim3 threads(256, 1, 1);
dim3 blocks(1024, 1, 1);
kern_make_rev_graph<<<blocks, threads, 0, raft::resource::get_cuda_stream(res)>>>(
d_dest_nodes.data_handle(),
d_rev_graph.data_handle(),
d_rev_graph_count.data_handle(),
graph_size,
output_graph_degree);
RAFT_LOG_DEBUG("# Making reverse graph on GPUs: %lu / %u \r", k, output_graph_degree);
}

raft::resource::sync_stream(res);
RAFT_LOG_DEBUG("\n");

raft::copy(d_dest_nodes.data_handle(),
dest_nodes.data_handle(),
if (d_rev_graph.allocated_memory()) {
raft::copy(rev_graph_view.data_handle(),
d_rev_graph.data_handle(),
graph_size * output_graph_degree,
raft::resource::get_cuda_stream(res));
}
raft::copy(rev_graph_count.data_handle(),
d_rev_graph_count.data_handle(),
graph_size,
raft::resource::get_cuda_stream(res));

dim3 threads(256, 1, 1);
dim3 blocks(1024, 1, 1);
kern_make_rev_graph<<<blocks, threads, 0, raft::resource::get_cuda_stream(res)>>>(
d_dest_nodes.data_handle(),
d_rev_graph.data_handle(),
d_rev_graph_count.data_handle(),
graph_size,
output_graph_degree);
RAFT_LOG_DEBUG("# Making reverse graph on GPUs: %lu / %u \r", k, output_graph_degree);
}

raft::resource::sync_stream(res);
RAFT_LOG_DEBUG("\n");

if (d_rev_graph.allocated_memory()) {
raft::copy(rev_graph.data_handle(),
d_rev_graph.data_handle(),
graph_size * output_graph_degree,
raft::resource::get_cuda_stream(res));
} else {
RAFT_EXPECTS(output_graph_degree < 256,
"Degree of the generated graph must be less than 256.");
bool use_cpu_2scan = true;
if (use_cpu_2scan) {
//
// CPU: Scan the graph twice.
//
// This implementation scans the graph twice, and requires fewer reads and writes
// to rev_graph. if the graph fits in host memory, this is probably faster.
//
raft::common::nvtx::range<cuvs::common::nvtx::domain::cuvs> block_scope(
"cagra::graph::optimize/reverse_CPU-2scan");
RAFT_LOG_INFO("# reverse_CPU-2scan"); // **** TO BE REMOVED ****

auto loc = raft::make_host_matrix<uint8_t, int64_t>(graph_size, output_graph_degree);
constexpr uint8_t loc_max = 255;
// Initialize arrays
#pragma omp parallel for
for (uint64_t i = 0; i < graph_size; i++) {
for (uint64_t k = 0; k < output_graph_degree; k++) {
loc(i, k) = 0;
rev_graph_view(i, k) = graph_size;
}
rev_graph_count(i) = 0;
}
// 1st graph scan: compute histogram
#pragma omp parallel for
for (uint64_t i = 0; i < graph_size; i++) {
for (uint64_t k = 0; k < output_graph_degree; k++) {
uint64_t j = output_graph_ptr[k + (output_graph_degree * i)];
if ((j == i) || (loc(j, k) >= output_graph_degree)) { continue; }
#pragma omp atomic
loc(j, k)++;
}
}
// Prefix-sum: compute where to write
#pragma omp parallel for
for (uint64_t j = 0; j < graph_size; j++) {
for (uint64_t k = 1; k < output_graph_degree; k++) {
loc(j, k) = std::min((uint32_t)loc_max, (uint32_t)loc(j, k - 1) + (uint32_t)loc(j, k));
}
uint64_t k = output_graph_degree - 1;
rev_graph_count(j) = std::min(output_graph_degree, (uint64_t)loc(j, k));
while (k > 0) {
loc(j, k) = loc(j, k - 1);
k--;
}
loc(j, 0) = 0;
}
// 2nd graph scan: create rev_graph
#pragma omp parallel for
for (uint64_t i = 0; i < graph_size; i++) {
for (uint64_t k = 0; k < output_graph_degree; k++) {
uint64_t j = output_graph_ptr[k + (output_graph_degree * i)];
if ((j == i) || (loc(j, k) >= output_graph_degree)) { continue; }
uint8_t l;
#pragma omp atomic capture
l = loc(j, k)++;
if (l >= output_graph_degree) { continue; }
rev_graph_view(j, l) = i;
}
}
} else {
//
// CPU: Scan the graph just once and atomic free.
//
// This implementation requires lots of small memory copies instead of a
// single graph scan. If the graph does not fit in host memory and is located
// on storage, this would be more appropriate.
//
raft::common::nvtx::range<cuvs::common::nvtx::domain::cuvs> block_scope(
"cagra::graph::optimize/reverse_CPU-1scan");
RAFT_LOG_INFO("# reverse_CPU-1scan"); // **** TO BE REMOVED ****

auto rank = raft::make_host_matrix<uint8_t, int64_t>(graph_size, output_graph_degree);
constexpr uint8_t max_rank = 255;
// Initialize arrays
#pragma omp parallel for
for (uint64_t i = 0; i < graph_size; i++) {
for (uint64_t k = 0; k < output_graph_degree; k++) {
rank(i, k) = max_rank;
rev_graph_view(i, k) = graph_size;
}
rev_graph_count(i) = 0;
}
// Scan the graph
#pragma omp parallel
{
int tid = 0;
int num_threads = 1;
#ifdef _OPENMP
tid = omp_get_thread_num();
num_threads = omp_get_num_threads();
#endif
for (uint64_t i = 0; i < graph_size; i++) {
for (uint64_t r = 0; r < output_graph_degree; r++) {
uint64_t j = output_graph_ptr[r + (output_graph_degree * i)];
if (j == i) {
// Ignore the self-edge.
continue;
}
if ((j % (uint64_t)num_threads) != (uint64_t)tid) {
// Restrict update of a ceartain array element to be
// done by a certain thread to make it atomic-free.
continue;
}
// Find a location to insert
uint64_t l0 = 0;
uint64_t l1 = std::min(output_graph_degree, (uint64_t)rev_graph_count(j));
uint64_t loc;
while (l0 < l1) {
loc = (l0 + l1) / 2;
if (rank(j, loc) <= r) {
l0 = loc + 1;
} else {
l1 = loc;
}
}
loc = (l0 + l1) / 2;
if (loc >= output_graph_degree) { continue; }
// Insert: copy and write
for (uint64_t l = std::min(output_graph_degree - 1, (uint64_t)rev_graph_count(j));
l > loc;
l--) {
rank(j, l) = rank(j, l - 1);
rev_graph_view(j, l) = rev_graph_view(j, l - 1);
}
rank(j, loc) = r;
rev_graph_view(j, loc) = i;
if (rev_graph_count(j) < output_graph_degree) { rev_graph_count(j) += 1; }
}
}
}
}
}
raft::copy(rev_graph_count.data_handle(),
d_rev_graph_count.data_handle(),
graph_size,
raft::resource::get_cuda_stream(res));

const double time_make_end = cur_time();
RAFT_LOG_DEBUG("# Making reverse graph time: %.1lf sec", time_make_end - time_make_start);
// RAFT_LOG_DEBUG("# Making reverse graph time: %.1lf sec", time_make_end - time_make_start);
RAFT_LOG_INFO("# Making reverse graph time: %.3lf sec", time_make_end - time_make_start);
}

{
Expand All @@ -1394,7 +1562,7 @@ void optimize(
bool check_num_protected_edges = true;
#pragma omp parallel for
for (uint64_t i = 0; i < graph_size; i++) {
auto my_rev_graph = rev_graph.data_handle() + (output_graph_degree * i);
auto my_rev_graph = rev_graph_view.data_handle() + (output_graph_degree * i);
auto my_out_graph = output_graph_ptr + (output_graph_degree * i);

// If guarantee_connectivity == true, use a temporal list to merge the neighbor lists of the
Expand Down Expand Up @@ -1484,6 +1652,11 @@ void optimize(
(double)num_replaced_edges / graph_size);
}

if (munmap(rev_graph_ptr, byte_size) != 0) {
perror("munmap");
exit(-1);
}

// Check number of incoming edges
{
raft::common::nvtx::range<cuvs::common::nvtx::domain::cuvs> block_scope(
Expand Down