diff --git a/libkineto/stress_test/kineto_stress_test.cpp b/libkineto/stress_test/kineto_stress_test.cpp index ecf02adf2..3b51ba698 100644 --- a/libkineto/stress_test/kineto_stress_test.cpp +++ b/libkineto/stress_test/kineto_stress_test.cpp @@ -182,7 +182,14 @@ void create_cuda_streams(stress_test_args& test_args) { if (test_args.use_memcpy_stream) { test_args.memcpy_streams = (cudaStream_t*)malloc(test_args.num_workers * sizeof(cudaStream_t)); for (uint32_t i = 0; i < test_args.num_workers; ++i) { - checkCudaStatus(cudaStreamCreateWithFlags(test_args.memcpy_streams + i, cudaStreamNonBlocking), __LINE__); + if (i % 2 != 0) { + checkCudaStatus(cudaStreamCreateWithFlags(test_args.memcpy_streams + i, cudaStreamNonBlocking), __LINE__); + } else { + int leastPriority = 0; + int greatestPriority = 0; + checkCudaStatus(cudaDeviceGetStreamPriorityRange(&leastPriority, &greatestPriority), __LINE__); + checkCudaStatus(cudaStreamCreateWithPriority(test_args.memcpy_streams + i, cudaStreamNonBlocking, leastPriority), __LINE__); + } } } diff --git a/libkineto/stress_test/random_ops_stress_test.cu b/libkineto/stress_test/random_ops_stress_test.cu index c25c0e60e..5d6018a2c 100644 --- a/libkineto/stress_test/random_ops_stress_test.cu +++ b/libkineto/stress_test/random_ops_stress_test.cu @@ -14,6 +14,7 @@ namespace kineto_stress_test { +#define CUDA_API_PER_THREAD_DEFAULT_STREAM #define RNG_SEED 2049 // NCCL variables buffers @@ -123,15 +124,15 @@ void run_stress_test( } else { v_streams = (cudaStream_t*)malloc(test_args.num_cuda_streams * sizeof(cudaStream_t)); for (uint32_t i = 0; i < test_args.num_cuda_streams; ++i) { - checkCudaStatus(cudaStreamCreate(v_streams + i), __LINE__); + checkCudaStatus(cudaStreamCreateWithFlags(v_streams + i, cudaStreamNonBlocking), __LINE__); } if (test_args.use_memcpy_stream) { - checkCudaStatus(cudaStreamCreate(&memcpy_stream), __LINE__); + checkCudaStatus(cudaStreamCreateWithFlags(&memcpy_stream, cudaStreamNonBlocking), __LINE__); } if (test_args.use_uvm_stream) { - checkCudaStatus(cudaStreamCreate(&uvm_stream), __LINE__); + checkCudaStatus(cudaStreamCreateWithFlags(&uvm_stream, cudaStreamNonBlocking), __LINE__); } } @@ -268,17 +269,29 @@ void run_stress_test( szTransfer, cudaMemcpyDeviceToDevice), __LINE__); } - // Simulate output download - if (p_memory_pool[pair_idx].b_copy_d2h) { + // Simulate checkpoint download. The odd workers will have higher stream priorities + // but lower number of transactions + bool enable_d2h_copy = p_memory_pool[pair_idx].b_copy_d2h; + if (thread_id % 2 != 0) { + if (rand_r(&rng_state) % 100 < 97) { + enable_d2h_copy = false; + } + } + + if (enable_d2h_copy) { + // checkCudaStatus(cudaStreamSynchronize(current_stream), __LINE__); uint32_t rand_index = rand_r(&rng_state) % p_memory_pool[pair_idx].n_elements; checkCudaStatus( cudaMemcpyAsync( - h_output + i, - p_memory_pool[pair_idx].d_C + rand_index, - sizeof(float), + p_memory_pool[pair_idx].h_C, + p_memory_pool[pair_idx].d_C, + p_memory_pool[pair_idx].n_elements * sizeof(float), cudaMemcpyDeviceToHost, current_memcpy_stream), __LINE__); + uint32_t rand_idx_out = rand_r(&rng_state) % test_args.num_operations; + // checkCudaStatus(cudaStreamSynchronize(current_memcpy_stream), __LINE__); + h_output[rand_idx_out] = p_memory_pool[pair_idx].h_C[rand_index]; } // Get memory during execution diff --git a/libkineto/stress_test/tensor_cache.cu b/libkineto/stress_test/tensor_cache.cu index 9f1c104ab..8de3a33c0 100644 --- a/libkineto/stress_test/tensor_cache.cu +++ b/libkineto/stress_test/tensor_cache.cu @@ -13,6 +13,7 @@ namespace kineto_stress_test { +#define CUDA_API_PER_THREAD_DEFAULT_STREAM #define RNG_SEED 1025 // A kernel that fills a device buffer with random values @@ -92,8 +93,11 @@ void add_pairs_to_tensor_cache(tensor_cache_args cache_args, uint32_t // Simulate output download if (((float)(rand() % 32767) / 32767.0) < cache_args.prob_d2h) { p_memory_pool[i].b_copy_d2h = true; + checkCudaStatus(cudaHostAlloc(&p_memory_pool[i].h_C, num_elements * sizeof(float), cudaHostAllocDefault), __LINE__); + simple_lcg_host(p_memory_pool[i].h_C, num_elements); } else { p_memory_pool[i].b_copy_d2h = false; + p_memory_pool[i].h_C = NULL; } // Now we have a new tensor pair @@ -151,42 +155,6 @@ void re_initialize_buffer_values() { } void free_and_realloc_tensor_pairs(tensor_pair *tensor_pair, cudaStream_t stream) { -// Older CUDA versions don't know about async malloc and free -#if defined(CUDA_VERSION) && CUDA_VERSION > 11000 && defined(ASYNC_MALLOC) - - checkCudaStatus( - cudaFreeAsync(tensor_pair->d_A, stream), - __LINE__); - checkCudaStatus( - cudaFreeAsync(tensor_pair->d_B, stream), - __LINE__); - checkCudaStatus( - cudaFreeAsync(tensor_pair->d_C, stream), - __LINE__); - - // Allocate device buffers - uint32_t num_elements = tensor_pair->n_elements; - checkCudaStatus( - cudaMallocAsync( - &tensor_pair->d_A, - num_elements * sizeof(float), - stream), - __LINE__); - checkCudaStatus( - cudaMallocAsync( - &tensor_pair->d_B, - num_elements * sizeof(float), - stream), - __LINE__); - checkCudaStatus( - cudaMallocAsync( - &tensor_pair->d_C, - num_elements * sizeof(float), - stream), - __LINE__); - -#else - checkCudaStatus(cudaFree(tensor_pair->d_A), __LINE__); checkCudaStatus(cudaFree(tensor_pair->d_B), __LINE__); checkCudaStatus(cudaFree(tensor_pair->d_C), __LINE__); @@ -203,8 +171,6 @@ void free_and_realloc_tensor_pairs(tensor_pair *tensor_pair, cudaStream_t stream num_elements * sizeof(float)), __LINE__); -#endif // CUDA_VERSION >= 11000 - if (tensor_pair->b_copy_h2d) { checkCudaStatus(cudaFreeHost(tensor_pair->h_A), __LINE__); checkCudaStatus(cudaFreeHost(tensor_pair->h_B), __LINE__); @@ -215,6 +181,12 @@ void free_and_realloc_tensor_pairs(tensor_pair *tensor_pair, cudaStream_t stream simple_lcg_host(tensor_pair->h_A, num_elements); simple_lcg_host(tensor_pair->h_B, num_elements); } + + if (tensor_pair->b_copy_d2h) { + checkCudaStatus(cudaFreeHost(tensor_pair->h_C), __LINE__); + checkCudaStatus(cudaHostAlloc(&tensor_pair->h_C, num_elements * sizeof(float), cudaHostAllocDefault), __LINE__); + simple_lcg_host(tensor_pair->h_C, num_elements); + } } void free_tensor_cache() { @@ -231,6 +203,10 @@ void free_tensor_cache() { if (p_memory_pool[i].h_B) { checkCudaStatus(cudaFreeHost(p_memory_pool[i].h_B), __LINE__); } + + if (p_memory_pool[i].h_C) { + checkCudaStatus(cudaFreeHost(p_memory_pool[i].h_C), __LINE__); + } } } diff --git a/libkineto/stress_test/tensor_cache.cuh b/libkineto/stress_test/tensor_cache.cuh index f6c79d76e..bcd0082c3 100644 --- a/libkineto/stress_test/tensor_cache.cuh +++ b/libkineto/stress_test/tensor_cache.cuh @@ -42,6 +42,7 @@ struct tensor_pair { // Host buffers float* h_A; float* h_B; + float* h_C; }; // The memory pool object