Skip to content

Commit

Permalink
Modify CUDA test to attempt overlapping D2H transfers
Browse files Browse the repository at this point in the history
Summary:
Distributed checkpointing for GenAI requires very expensive memory downloads from the GPU which can block the trainer thread if it happens that it issues a new D2H transfer.

For example, we want that model parameters and optimizer state downloads to overlap with compute. However if for some reason the forward pass thread or the backward pass issue a D2H transfer, it will have to wait until the checkpoint download was completed.

This code is a test program for Kineto that issues CUDA kernels, memory copies and UVM accesses in a configurable way. This change enables us to issue multiple GPU D2H downloads to host memory using multiple streams on multiple threads. Previously the D2H downloads were very short because we downloaded a single output value of 4 bytes. With the change we download an entire buffer.

Reviewed By: xerothermic

Differential Revision: D62601073

fbshipit-source-id: ed192723403787f37d45bf63d39e1a768df4a1d3
  • Loading branch information
valentinandrei authored and facebook-github-bot committed Sep 13, 2024
1 parent ca1eedb commit 79be470
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 47 deletions.
9 changes: 8 additions & 1 deletion libkineto/stress_test/kineto_stress_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,14 @@ void create_cuda_streams(stress_test_args& test_args) {
if (test_args.use_memcpy_stream) {
test_args.memcpy_streams = (cudaStream_t*)malloc(test_args.num_workers * sizeof(cudaStream_t));
for (uint32_t i = 0; i < test_args.num_workers; ++i) {
checkCudaStatus(cudaStreamCreateWithFlags(test_args.memcpy_streams + i, cudaStreamNonBlocking), __LINE__);
if (i % 2 != 0) {
checkCudaStatus(cudaStreamCreateWithFlags(test_args.memcpy_streams + i, cudaStreamNonBlocking), __LINE__);
} else {
int leastPriority = 0;
int greatestPriority = 0;
checkCudaStatus(cudaDeviceGetStreamPriorityRange(&leastPriority, &greatestPriority), __LINE__);
checkCudaStatus(cudaStreamCreateWithPriority(test_args.memcpy_streams + i, cudaStreamNonBlocking, leastPriority), __LINE__);
}
}
}

Expand Down
29 changes: 21 additions & 8 deletions libkineto/stress_test/random_ops_stress_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

namespace kineto_stress_test {

#define CUDA_API_PER_THREAD_DEFAULT_STREAM
#define RNG_SEED 2049

// NCCL variables buffers
Expand Down Expand Up @@ -123,15 +124,15 @@ void run_stress_test(
} else {
v_streams = (cudaStream_t*)malloc(test_args.num_cuda_streams * sizeof(cudaStream_t));
for (uint32_t i = 0; i < test_args.num_cuda_streams; ++i) {
checkCudaStatus(cudaStreamCreate(v_streams + i), __LINE__);
checkCudaStatus(cudaStreamCreateWithFlags(v_streams + i, cudaStreamNonBlocking), __LINE__);
}

if (test_args.use_memcpy_stream) {
checkCudaStatus(cudaStreamCreate(&memcpy_stream), __LINE__);
checkCudaStatus(cudaStreamCreateWithFlags(&memcpy_stream, cudaStreamNonBlocking), __LINE__);
}

if (test_args.use_uvm_stream) {
checkCudaStatus(cudaStreamCreate(&uvm_stream), __LINE__);
checkCudaStatus(cudaStreamCreateWithFlags(&uvm_stream, cudaStreamNonBlocking), __LINE__);
}
}

Expand Down Expand Up @@ -268,17 +269,29 @@ void run_stress_test(
szTransfer, cudaMemcpyDeviceToDevice), __LINE__);
}

// Simulate output download
if (p_memory_pool[pair_idx].b_copy_d2h) {
// Simulate checkpoint download. The odd workers will have higher stream priorities
// but lower number of transactions
bool enable_d2h_copy = p_memory_pool[pair_idx].b_copy_d2h;
if (thread_id % 2 != 0) {
if (rand_r(&rng_state) % 100 < 97) {
enable_d2h_copy = false;
}
}

if (enable_d2h_copy) {
// checkCudaStatus(cudaStreamSynchronize(current_stream), __LINE__);
uint32_t rand_index = rand_r(&rng_state) % p_memory_pool[pair_idx].n_elements;
checkCudaStatus(
cudaMemcpyAsync(
h_output + i,
p_memory_pool[pair_idx].d_C + rand_index,
sizeof(float),
p_memory_pool[pair_idx].h_C,
p_memory_pool[pair_idx].d_C,
p_memory_pool[pair_idx].n_elements * sizeof(float),
cudaMemcpyDeviceToHost,
current_memcpy_stream),
__LINE__);
uint32_t rand_idx_out = rand_r(&rng_state) % test_args.num_operations;
// checkCudaStatus(cudaStreamSynchronize(current_memcpy_stream), __LINE__);
h_output[rand_idx_out] = p_memory_pool[pair_idx].h_C[rand_index];
}

// Get memory during execution
Expand Down
52 changes: 14 additions & 38 deletions libkineto/stress_test/tensor_cache.cu
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace kineto_stress_test {

#define CUDA_API_PER_THREAD_DEFAULT_STREAM
#define RNG_SEED 1025

// A kernel that fills a device buffer with random values
Expand Down Expand Up @@ -92,8 +93,11 @@ void add_pairs_to_tensor_cache(tensor_cache_args cache_args, uint32_t
// Simulate output download
if (((float)(rand() % 32767) / 32767.0) < cache_args.prob_d2h) {
p_memory_pool[i].b_copy_d2h = true;
checkCudaStatus(cudaHostAlloc(&p_memory_pool[i].h_C, num_elements * sizeof(float), cudaHostAllocDefault), __LINE__);
simple_lcg_host(p_memory_pool[i].h_C, num_elements);
} else {
p_memory_pool[i].b_copy_d2h = false;
p_memory_pool[i].h_C = NULL;
}

// Now we have a new tensor pair
Expand Down Expand Up @@ -151,42 +155,6 @@ void re_initialize_buffer_values() {
}

void free_and_realloc_tensor_pairs(tensor_pair *tensor_pair, cudaStream_t stream) {
// Older CUDA versions don't know about async malloc and free
#if defined(CUDA_VERSION) && CUDA_VERSION > 11000 && defined(ASYNC_MALLOC)

checkCudaStatus(
cudaFreeAsync(tensor_pair->d_A, stream),
__LINE__);
checkCudaStatus(
cudaFreeAsync(tensor_pair->d_B, stream),
__LINE__);
checkCudaStatus(
cudaFreeAsync(tensor_pair->d_C, stream),
__LINE__);

// Allocate device buffers
uint32_t num_elements = tensor_pair->n_elements;
checkCudaStatus(
cudaMallocAsync(
&tensor_pair->d_A,
num_elements * sizeof(float),
stream),
__LINE__);
checkCudaStatus(
cudaMallocAsync(
&tensor_pair->d_B,
num_elements * sizeof(float),
stream),
__LINE__);
checkCudaStatus(
cudaMallocAsync(
&tensor_pair->d_C,
num_elements * sizeof(float),
stream),
__LINE__);

#else

checkCudaStatus(cudaFree(tensor_pair->d_A), __LINE__);
checkCudaStatus(cudaFree(tensor_pair->d_B), __LINE__);
checkCudaStatus(cudaFree(tensor_pair->d_C), __LINE__);
Expand All @@ -203,8 +171,6 @@ void free_and_realloc_tensor_pairs(tensor_pair *tensor_pair, cudaStream_t stream
num_elements * sizeof(float)),
__LINE__);

#endif // CUDA_VERSION >= 11000

if (tensor_pair->b_copy_h2d) {
checkCudaStatus(cudaFreeHost(tensor_pair->h_A), __LINE__);
checkCudaStatus(cudaFreeHost(tensor_pair->h_B), __LINE__);
Expand All @@ -215,6 +181,12 @@ void free_and_realloc_tensor_pairs(tensor_pair *tensor_pair, cudaStream_t stream
simple_lcg_host(tensor_pair->h_A, num_elements);
simple_lcg_host(tensor_pair->h_B, num_elements);
}

if (tensor_pair->b_copy_d2h) {
checkCudaStatus(cudaFreeHost(tensor_pair->h_C), __LINE__);
checkCudaStatus(cudaHostAlloc(&tensor_pair->h_C, num_elements * sizeof(float), cudaHostAllocDefault), __LINE__);
simple_lcg_host(tensor_pair->h_C, num_elements);
}
}

void free_tensor_cache() {
Expand All @@ -231,6 +203,10 @@ void free_tensor_cache() {
if (p_memory_pool[i].h_B) {
checkCudaStatus(cudaFreeHost(p_memory_pool[i].h_B), __LINE__);
}

if (p_memory_pool[i].h_C) {
checkCudaStatus(cudaFreeHost(p_memory_pool[i].h_C), __LINE__);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions libkineto/stress_test/tensor_cache.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct tensor_pair {
// Host buffers
float* h_A;
float* h_B;
float* h_C;
};

// The memory pool object
Expand Down

0 comments on commit 79be470

Please sign in to comment.