From 79be4704ca3d04728ef51bd19300d9e0fc99003c Mon Sep 17 00:00:00 2001 From: Valentin Andrei Date: Thu, 12 Sep 2024 20:36:23 -0700 Subject: [PATCH] Modify CUDA test to attempt overlapping D2H transfers Summary: Distributed checkpointing for GenAI requires very expensive memory downloads from the GPU which can block the trainer thread if it happens that it issues a new D2H transfer. For example, we want that model parameters and optimizer state downloads to overlap with compute. However if for some reason the forward pass thread or the backward pass issue a D2H transfer, it will have to wait until the checkpoint download was completed. This code is a test program for Kineto that issues CUDA kernels, memory copies and UVM accesses in a configurable way. This change enables us to issue multiple GPU D2H downloads to host memory using multiple streams on multiple threads. Previously the D2H downloads were very short because we downloaded a single output value of 4 bytes. With the change we download an entire buffer. Reviewed By: xerothermic Differential Revision: D62601073 fbshipit-source-id: ed192723403787f37d45bf63d39e1a768df4a1d3 --- libkineto/stress_test/kineto_stress_test.cpp | 9 +++- .../stress_test/random_ops_stress_test.cu | 29 ++++++++--- libkineto/stress_test/tensor_cache.cu | 52 +++++-------------- libkineto/stress_test/tensor_cache.cuh | 1 + 4 files changed, 44 insertions(+), 47 deletions(-) diff --git a/libkineto/stress_test/kineto_stress_test.cpp b/libkineto/stress_test/kineto_stress_test.cpp index ecf02adf2..3b51ba698 100644 --- a/libkineto/stress_test/kineto_stress_test.cpp +++ b/libkineto/stress_test/kineto_stress_test.cpp @@ -182,7 +182,14 @@ void create_cuda_streams(stress_test_args& test_args) { if (test_args.use_memcpy_stream) { test_args.memcpy_streams = (cudaStream_t*)malloc(test_args.num_workers * sizeof(cudaStream_t)); for (uint32_t i = 0; i < test_args.num_workers; ++i) { - checkCudaStatus(cudaStreamCreateWithFlags(test_args.memcpy_streams + i, cudaStreamNonBlocking), __LINE__); + if (i % 2 != 0) { + checkCudaStatus(cudaStreamCreateWithFlags(test_args.memcpy_streams + i, cudaStreamNonBlocking), __LINE__); + } else { + int leastPriority = 0; + int greatestPriority = 0; + checkCudaStatus(cudaDeviceGetStreamPriorityRange(&leastPriority, &greatestPriority), __LINE__); + checkCudaStatus(cudaStreamCreateWithPriority(test_args.memcpy_streams + i, cudaStreamNonBlocking, leastPriority), __LINE__); + } } } diff --git a/libkineto/stress_test/random_ops_stress_test.cu b/libkineto/stress_test/random_ops_stress_test.cu index c25c0e60e..5d6018a2c 100644 --- a/libkineto/stress_test/random_ops_stress_test.cu +++ b/libkineto/stress_test/random_ops_stress_test.cu @@ -14,6 +14,7 @@ namespace kineto_stress_test { +#define CUDA_API_PER_THREAD_DEFAULT_STREAM #define RNG_SEED 2049 // NCCL variables buffers @@ -123,15 +124,15 @@ void run_stress_test( } else { v_streams = (cudaStream_t*)malloc(test_args.num_cuda_streams * sizeof(cudaStream_t)); for (uint32_t i = 0; i < test_args.num_cuda_streams; ++i) { - checkCudaStatus(cudaStreamCreate(v_streams + i), __LINE__); + checkCudaStatus(cudaStreamCreateWithFlags(v_streams + i, cudaStreamNonBlocking), __LINE__); } if (test_args.use_memcpy_stream) { - checkCudaStatus(cudaStreamCreate(&memcpy_stream), __LINE__); + checkCudaStatus(cudaStreamCreateWithFlags(&memcpy_stream, cudaStreamNonBlocking), __LINE__); } if (test_args.use_uvm_stream) { - checkCudaStatus(cudaStreamCreate(&uvm_stream), __LINE__); + checkCudaStatus(cudaStreamCreateWithFlags(&uvm_stream, cudaStreamNonBlocking), __LINE__); } } @@ -268,17 +269,29 @@ void run_stress_test( szTransfer, cudaMemcpyDeviceToDevice), __LINE__); } - // Simulate output download - if (p_memory_pool[pair_idx].b_copy_d2h) { + // Simulate checkpoint download. The odd workers will have higher stream priorities + // but lower number of transactions + bool enable_d2h_copy = p_memory_pool[pair_idx].b_copy_d2h; + if (thread_id % 2 != 0) { + if (rand_r(&rng_state) % 100 < 97) { + enable_d2h_copy = false; + } + } + + if (enable_d2h_copy) { + // checkCudaStatus(cudaStreamSynchronize(current_stream), __LINE__); uint32_t rand_index = rand_r(&rng_state) % p_memory_pool[pair_idx].n_elements; checkCudaStatus( cudaMemcpyAsync( - h_output + i, - p_memory_pool[pair_idx].d_C + rand_index, - sizeof(float), + p_memory_pool[pair_idx].h_C, + p_memory_pool[pair_idx].d_C, + p_memory_pool[pair_idx].n_elements * sizeof(float), cudaMemcpyDeviceToHost, current_memcpy_stream), __LINE__); + uint32_t rand_idx_out = rand_r(&rng_state) % test_args.num_operations; + // checkCudaStatus(cudaStreamSynchronize(current_memcpy_stream), __LINE__); + h_output[rand_idx_out] = p_memory_pool[pair_idx].h_C[rand_index]; } // Get memory during execution diff --git a/libkineto/stress_test/tensor_cache.cu b/libkineto/stress_test/tensor_cache.cu index 9f1c104ab..8de3a33c0 100644 --- a/libkineto/stress_test/tensor_cache.cu +++ b/libkineto/stress_test/tensor_cache.cu @@ -13,6 +13,7 @@ namespace kineto_stress_test { +#define CUDA_API_PER_THREAD_DEFAULT_STREAM #define RNG_SEED 1025 // A kernel that fills a device buffer with random values @@ -92,8 +93,11 @@ void add_pairs_to_tensor_cache(tensor_cache_args cache_args, uint32_t // Simulate output download if (((float)(rand() % 32767) / 32767.0) < cache_args.prob_d2h) { p_memory_pool[i].b_copy_d2h = true; + checkCudaStatus(cudaHostAlloc(&p_memory_pool[i].h_C, num_elements * sizeof(float), cudaHostAllocDefault), __LINE__); + simple_lcg_host(p_memory_pool[i].h_C, num_elements); } else { p_memory_pool[i].b_copy_d2h = false; + p_memory_pool[i].h_C = NULL; } // Now we have a new tensor pair @@ -151,42 +155,6 @@ void re_initialize_buffer_values() { } void free_and_realloc_tensor_pairs(tensor_pair *tensor_pair, cudaStream_t stream) { -// Older CUDA versions don't know about async malloc and free -#if defined(CUDA_VERSION) && CUDA_VERSION > 11000 && defined(ASYNC_MALLOC) - - checkCudaStatus( - cudaFreeAsync(tensor_pair->d_A, stream), - __LINE__); - checkCudaStatus( - cudaFreeAsync(tensor_pair->d_B, stream), - __LINE__); - checkCudaStatus( - cudaFreeAsync(tensor_pair->d_C, stream), - __LINE__); - - // Allocate device buffers - uint32_t num_elements = tensor_pair->n_elements; - checkCudaStatus( - cudaMallocAsync( - &tensor_pair->d_A, - num_elements * sizeof(float), - stream), - __LINE__); - checkCudaStatus( - cudaMallocAsync( - &tensor_pair->d_B, - num_elements * sizeof(float), - stream), - __LINE__); - checkCudaStatus( - cudaMallocAsync( - &tensor_pair->d_C, - num_elements * sizeof(float), - stream), - __LINE__); - -#else - checkCudaStatus(cudaFree(tensor_pair->d_A), __LINE__); checkCudaStatus(cudaFree(tensor_pair->d_B), __LINE__); checkCudaStatus(cudaFree(tensor_pair->d_C), __LINE__); @@ -203,8 +171,6 @@ void free_and_realloc_tensor_pairs(tensor_pair *tensor_pair, cudaStream_t stream num_elements * sizeof(float)), __LINE__); -#endif // CUDA_VERSION >= 11000 - if (tensor_pair->b_copy_h2d) { checkCudaStatus(cudaFreeHost(tensor_pair->h_A), __LINE__); checkCudaStatus(cudaFreeHost(tensor_pair->h_B), __LINE__); @@ -215,6 +181,12 @@ void free_and_realloc_tensor_pairs(tensor_pair *tensor_pair, cudaStream_t stream simple_lcg_host(tensor_pair->h_A, num_elements); simple_lcg_host(tensor_pair->h_B, num_elements); } + + if (tensor_pair->b_copy_d2h) { + checkCudaStatus(cudaFreeHost(tensor_pair->h_C), __LINE__); + checkCudaStatus(cudaHostAlloc(&tensor_pair->h_C, num_elements * sizeof(float), cudaHostAllocDefault), __LINE__); + simple_lcg_host(tensor_pair->h_C, num_elements); + } } void free_tensor_cache() { @@ -231,6 +203,10 @@ void free_tensor_cache() { if (p_memory_pool[i].h_B) { checkCudaStatus(cudaFreeHost(p_memory_pool[i].h_B), __LINE__); } + + if (p_memory_pool[i].h_C) { + checkCudaStatus(cudaFreeHost(p_memory_pool[i].h_C), __LINE__); + } } } diff --git a/libkineto/stress_test/tensor_cache.cuh b/libkineto/stress_test/tensor_cache.cuh index f6c79d76e..bcd0082c3 100644 --- a/libkineto/stress_test/tensor_cache.cuh +++ b/libkineto/stress_test/tensor_cache.cuh @@ -42,6 +42,7 @@ struct tensor_pair { // Host buffers float* h_A; float* h_B; + float* h_C; }; // The memory pool object