diff --git a/Src/Base/AMReX_Arena.cpp b/Src/Base/AMReX_Arena.cpp index 94e03d42ad..ee8bf1eef6 100644 --- a/Src/Base/AMReX_Arena.cpp +++ b/Src/Base/AMReX_Arena.cpp @@ -175,6 +175,7 @@ Arena::allocate_system (std::size_t nbytes) // NOLINT(readability-make-member-fu { std::size_t free_mem_avail = Gpu::Device::freeMemAvailable(); if (nbytes >= free_mem_avail) { + Gpu::streamSynchronizeAll(); // this could cause some memory to be freed free_mem_avail += freeUnused_protected(); // For CArena, mutex has already acquired if (abort_on_out_of_gpu_memory && nbytes >= free_mem_avail) { amrex::Abort("Out of gpu memory. Free: " + std::to_string(free_mem_avail) diff --git a/Src/Base/AMReX_CArena.H b/Src/Base/AMReX_CArena.H index d90f9f0bf5..02a2b89e4b 100644 --- a/Src/Base/AMReX_CArena.H +++ b/Src/Base/AMReX_CArena.H @@ -63,6 +63,8 @@ public: */ void free (void* vp) final; + void free_now (void* vp); + std::size_t freeUnused () final; /** diff --git a/Src/Base/AMReX_CArena.cpp b/Src/Base/AMReX_CArena.cpp index 1ecdd06d3b..bac3fbc7a0 100644 --- a/Src/Base/AMReX_CArena.cpp +++ b/Src/Base/AMReX_CArena.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -265,6 +266,23 @@ CArena::free (void* vp) return; } + if (this->isDeviceAccessible()) { + Gpu::Device::freeAfterSync(this, vp); + } else { + free_now(vp); + } +} + +void +CArena::free_now (void* vp) +{ + if (vp == nullptr) { + // + // Allow calls with NULL as allowed by C++ delete. + // + return; + } + std::lock_guard lock(carena_mutex); // diff --git a/Src/Base/AMReX_GpuDevice.H b/Src/Base/AMReX_GpuDevice.H index eb2b95943e..979433e0ee 100644 --- a/Src/Base/AMReX_GpuDevice.H +++ b/Src/Base/AMReX_GpuDevice.H @@ -16,6 +16,7 @@ #include #include #include +#include #define AMREX_GPU_MAX_STREAMS 8 @@ -46,8 +47,28 @@ using gpuDeviceProp_t = cudaDeviceProp; } #endif +namespace amrex { + class CArena; +} + namespace amrex::Gpu { +#ifdef AMREX_USE_GPU +class StreamManager { + gpuStream_t m_stream; + std::uint64_t m_stream_op_id = 0; + std::uint64_t m_last_sync = 0; + Vector> m_free_wait_list; + std::mutex m_mutex; +public: + [[nodiscard]] gpuStream_t get (); + [[nodiscard]] gpuStream_t& internal_get (); + void sync (); + void internal_after_sync (); + void stream_free (CArena* arena, void* mem); +}; +#endif + class Device { @@ -57,17 +78,32 @@ public: static void Finalize (); #if defined(AMREX_USE_GPU) - static gpuStream_t gpuStream () noexcept { return gpu_stream[OpenMP::get_thread_num()]; } + static gpuStream_t gpuStream () noexcept { + return gpu_stream_pool[gpu_stream_index[OpenMP::get_thread_num()]].get(); + } #ifdef AMREX_USE_CUDA /** for backward compatibility */ - static cudaStream_t cudaStream () noexcept { return gpu_stream[OpenMP::get_thread_num()]; } + static cudaStream_t cudaStream () noexcept { + return gpu_stream_pool[gpu_stream_index[OpenMP::get_thread_num()]].get(); + } #endif #ifdef AMREX_USE_SYCL - static sycl::queue& streamQueue () noexcept { return *(gpu_stream[OpenMP::get_thread_num()].queue); } - static sycl::queue& streamQueue (int i) noexcept { return *(gpu_stream_pool[i].queue); } + static sycl::queue& streamQueue () noexcept { + return *(gpu_stream_pool[gpu_stream_index[OpenMP::get_thread_num()]].get().queue); + } + static sycl::queue& streamQueue (int i) noexcept { + return *(gpu_stream_pool[i].get().queue); + } #endif #endif + static void freeAfterSync (CArena* arena, void* mem) noexcept { + amrex::ignore_unused(arena, mem); +#ifdef AMREX_USE_CUDA + gpu_stream_pool[gpu_stream_index[OpenMP::get_thread_num()]].stream_free(arena, mem); +#endif + } + static int numGpuStreams () noexcept { return inSingleStreamRegion() ? 1 : max_gpu_streams; } @@ -104,6 +140,16 @@ public: */ static void streamSynchronizeAll () noexcept; +#ifdef AMREX_USE_GPU + /** + * Halt execution of code until the current AMReX GPU stream has finished processing all + * previously requested tasks. Unlike streamSynchronize which avoids redundant + * synchronizations when being called multiple times in a row, + * this function always causes the GPU stream to be synchronized + */ + static void actualStreamSynchronize (gpuStream_t stream) noexcept; +#endif + #if defined(__CUDACC__) /** Generic graph selection. These should be called by users. */ static void startGraphRecording(bool first_iter, void* h_ptr, void* d_ptr, size_t sz); @@ -196,10 +242,11 @@ private: static AMREX_EXPORT dim3 numThreadsMin; static AMREX_EXPORT dim3 numBlocksOverride, numThreadsOverride; - static AMREX_EXPORT Vector gpu_stream_pool; // The size of this is max_gpu_stream - // The non-owning gpu_stream is used to store the current stream that will be used. - // gpu_stream is a vector so that it's thread safe to write to it. - static AMREX_EXPORT Vector gpu_stream; // The size of this is omp_max_threads + static AMREX_EXPORT Vector gpu_stream_pool; // The size of this is max_gpu_stream + // The non-owning gpu_stream_index is used to store the current stream index that will be used. + // gpu_stream_index is a vector so that it's thread safe to write to it. + static AMREX_EXPORT Vector gpu_stream_index; // The size of this is omp_max_threads + static AMREX_EXPORT gpuDeviceProp_t device_prop; static AMREX_EXPORT int memory_pools_supported; static AMREX_EXPORT unsigned int max_blocks_per_launch; diff --git a/Src/Base/AMReX_GpuDevice.cpp b/Src/Base/AMReX_GpuDevice.cpp index 1bc05cee19..2d2573b793 100644 --- a/Src/Base/AMReX_GpuDevice.cpp +++ b/Src/Base/AMReX_GpuDevice.cpp @@ -1,4 +1,5 @@ +#include #include #include #include @@ -93,15 +94,102 @@ int Device::max_gpu_streams = 1; #endif #ifdef AMREX_USE_GPU + +[[nodiscard]] gpuStream_t +StreamManager::get () { + std::lock_guard lock(m_mutex); + ++m_stream_op_id; + return m_stream; +} + +[[nodiscard]] gpuStream_t& +StreamManager::internal_get () { + return m_stream; +} + +void +StreamManager::sync () { + + bool is_synced = false; + std::uint64_t sync_op = 0; + decltype(m_free_wait_list) new_empty_wait_list{}; + { + // lock mutex before accessing and modifying member variables + std::lock_guard lock(m_mutex); + is_synced = (m_stream_op_id == m_last_sync); + if (!is_synced) { + sync_op = m_stream_op_id; + m_free_wait_list.swap(new_empty_wait_list); + } + // unlock mutex before stream sync and memory free + // to avoid deadlocks from the CArena mutex + } + + if (!is_synced) { + Device::actualStreamSynchronize(m_stream); + + // synconizing the stream may have taken a long time and + // there may be new kernels launched already, so we free memory and + // set m_last_sync according to the state from before the stream was synced + + { + std::lock_guard lock(m_mutex); + m_last_sync = sync_op; + } + + for (auto [arena, mem] : new_empty_wait_list) { + arena->free_now(mem); + } + } +} + +void +StreamManager::internal_after_sync () { + + decltype(m_free_wait_list) new_empty_wait_list{}; + { + // lock mutex before accessing and modifying member variables + std::lock_guard lock(m_mutex); + m_last_sync = m_stream_op_id; + m_free_wait_list.swap(new_empty_wait_list); + // unlock mutex before memory free + // to avoid deadlocks from the CArena mutex + } + + for (auto [arena, mem] : new_empty_wait_list) { + arena->free_now(mem); + } +} + +void +StreamManager::stream_free (CArena* arena, void* mem) { + + bool is_synced = false; + { + // lock mutex before accessing and modifying member variables + std::lock_guard lock(m_mutex); + is_synced = (m_stream_op_id == m_last_sync); + if (!is_synced) { + m_free_wait_list.emplace_back(arena, mem); + } + // unlock mutex before memory free + // to avoid deadlocks from the CArena mutex + } + + if (is_synced) { + arena->free_now(mem); + } +} + dim3 Device::numThreadsMin = dim3(1, 1, 1); dim3 Device::numThreadsOverride = dim3(0, 0, 0); dim3 Device::numBlocksOverride = dim3(0, 0, 0); unsigned int Device::max_blocks_per_launch = 2560; -Vector Device::gpu_stream_pool; -Vector Device::gpu_stream; -gpuDeviceProp_t Device::device_prop; -int Device::memory_pools_supported = 0; +Vector Device::gpu_stream_pool; +Vector Device::gpu_stream_index; +gpuDeviceProp_t Device::device_prop; +int Device::memory_pools_supported = 0; constexpr int Device::warp_size; @@ -383,24 +471,25 @@ void Device::Finalize () { #ifdef AMREX_USE_GPU + streamSynchronizeAll(); Device::profilerStop(); #ifdef AMREX_USE_SYCL for (auto& s : gpu_stream_pool) { - delete s.queue; - s.queue = nullptr; + delete s.internal_get().queue; + s.internal_get().queue = nullptr; } sycl_context.reset(); sycl_device.reset(); #else for (int i = 0; i < max_gpu_streams; ++i) { - AMREX_HIP_OR_CUDA( AMREX_HIP_SAFE_CALL( hipStreamDestroy(gpu_stream_pool[i]));, - AMREX_CUDA_SAFE_CALL(cudaStreamDestroy(gpu_stream_pool[i])); ); + AMREX_HIP_OR_CUDA( AMREX_HIP_SAFE_CALL( hipStreamDestroy(gpu_stream_pool[i].internal_get()));, + AMREX_CUDA_SAFE_CALL(cudaStreamDestroy(gpu_stream_pool[i].internal_get())); ); } #endif - gpu_stream.clear(); + gpu_stream_index.clear(); #ifdef AMREX_USE_ACC amrex_finalize_acc(); @@ -416,7 +505,9 @@ Device::initialize_gpu (bool minimal) #ifdef AMREX_USE_GPU - gpu_stream_pool.resize(max_gpu_streams); + if (gpu_stream_pool.size() != max_gpu_streams) { + gpu_stream_pool = Vector(max_gpu_streams); + } #ifdef AMREX_USE_HIP @@ -429,7 +520,7 @@ Device::initialize_gpu (bool minimal) // AMD devices do not support shared cache banking. for (int i = 0; i < max_gpu_streams; ++i) { - AMREX_HIP_SAFE_CALL(hipStreamCreate(&gpu_stream_pool[i])); + AMREX_HIP_SAFE_CALL(hipStreamCreate(&gpu_stream_pool[i].internal_get())); } #ifdef AMREX_GPU_STREAM_ALLOC_SUPPORT @@ -457,9 +548,9 @@ Device::initialize_gpu (bool minimal) #endif for (int i = 0; i < max_gpu_streams; ++i) { - AMREX_CUDA_SAFE_CALL(cudaStreamCreate(&gpu_stream_pool[i])); + AMREX_CUDA_SAFE_CALL(cudaStreamCreate(&gpu_stream_pool[i].internal_get())); #ifdef AMREX_USE_ACC - acc_set_cuda_stream(i, gpu_stream_pool[i]); + acc_set_cuda_stream(i, gpu_stream_pool[i].internal_get()); #endif } @@ -472,7 +563,7 @@ Device::initialize_gpu (bool minimal) sycl_device = std::make_unique(gpu_devices[device_id]); sycl_context = std::make_unique(*sycl_device, amrex_sycl_error_handler); for (int i = 0; i < max_gpu_streams; ++i) { - gpu_stream_pool[i].queue = new sycl::queue(*sycl_context, *sycl_device, + gpu_stream_pool[i].internal_get().queue = new sycl::queue(*sycl_context, *sycl_device, sycl::property_list{sycl::property::queue::in_order{}}); } } @@ -555,7 +646,7 @@ Device::initialize_gpu (bool minimal) } #endif - gpu_stream.resize(OpenMP::get_max_threads(), gpu_stream_pool[0]); + gpu_stream_index.resize(OpenMP::get_max_threads(), 0); ParmParse pp("device"); @@ -625,8 +716,13 @@ int Device::numDevicePartners () noexcept int Device::streamIndex (gpuStream_t s) noexcept { - auto it = std::find(std::begin(gpu_stream_pool), std::end(gpu_stream_pool), s); - return static_cast(std::distance(std::begin(gpu_stream_pool), it)); + const int N = gpu_stream_pool.size(); + for (int i = 0; i < N ; ++i) { + if (gpu_stream_pool[i].internal_get() == s) { + return i; + } + } + return N; } #endif @@ -635,7 +731,7 @@ Device::setStreamIndex (int idx) noexcept { amrex::ignore_unused(idx); #ifdef AMREX_USE_GPU - gpu_stream[OpenMP::get_thread_num()] = gpu_stream_pool[idx % max_gpu_streams]; + gpu_stream_index[OpenMP::get_thread_num()] = idx % max_gpu_streams; #ifdef AMREX_USE_ACC amrex_set_acc_stream(idx % max_gpu_streams); #endif @@ -646,16 +742,16 @@ Device::setStreamIndex (int idx) noexcept gpuStream_t Device::resetStream () noexcept { - gpuStream_t r = gpu_stream[OpenMP::get_thread_num()]; - gpu_stream[OpenMP::get_thread_num()] = gpu_stream_pool[0]; + gpuStream_t r = gpu_stream_pool[gpu_stream_index[OpenMP::get_thread_num()]].get(); + gpu_stream_index[OpenMP::get_thread_num()] = 0; return r; } gpuStream_t Device::setStream (gpuStream_t s) noexcept { - gpuStream_t r = gpu_stream[OpenMP::get_thread_num()]; - gpu_stream[OpenMP::get_thread_num()] = s; + gpuStream_t r = gpu_stream_pool[gpu_stream_index[OpenMP::get_thread_num()]].get(); + gpu_stream_index[OpenMP::get_thread_num()] = streamIndex(s); return r; } #endif @@ -664,32 +760,23 @@ void Device::synchronize () noexcept { #ifdef AMREX_USE_SYCL - for (auto const& s : gpu_stream_pool) { - try { - s.queue->wait_and_throw(); - } catch (sycl::exception const& ex) { - amrex::Abort(std::string("synchronize: ")+ex.what()+"!!!!!"); - } + for (auto& s : gpu_stream_pool) { + s.sync(); } -#else +#elif defined(AMREX_USE_GPU) AMREX_HIP_OR_CUDA( AMREX_HIP_SAFE_CALL(hipDeviceSynchronize());, AMREX_CUDA_SAFE_CALL(cudaDeviceSynchronize()); ) + for (auto& s : gpu_stream_pool) { + s.internal_after_sync(); + } #endif } void Device::streamSynchronize () noexcept { -#ifdef AMREX_USE_SYCL - auto& q = streamQueue(); - try { - q.wait_and_throw(); - } catch (sycl::exception const& ex) { - amrex::Abort(std::string("streamSynchronize: ")+ex.what()+"!!!!!"); - } -#else - AMREX_HIP_OR_CUDA( AMREX_HIP_SAFE_CALL(hipStreamSynchronize(gpuStream()));, - AMREX_CUDA_SAFE_CALL(cudaStreamSynchronize(gpuStream())); ) +#ifdef AMREX_USE_GPU + gpu_stream_pool[gpu_stream_index[OpenMP::get_thread_num()]].sync(); #endif } @@ -697,16 +784,52 @@ void Device::streamSynchronizeAll () noexcept { #ifdef AMREX_USE_GPU -#ifdef AMREX_USE_SYCL - Device::synchronize(); -#else - for (auto const& s : gpu_stream_pool) { - AMREX_HIP_OR_CUDA( AMREX_HIP_SAFE_CALL(hipStreamSynchronize(s));, - AMREX_CUDA_SAFE_CALL(cudaStreamSynchronize(s)); ) + for (auto& s : gpu_stream_pool) { + s.sync(); } #endif +} + +#ifdef AMREX_USE_GPU +void +Device::actualStreamSynchronize (gpuStream_t stream) noexcept +{ +#if defined(AMREX_USE_CUDA) + cudaError_t amrex_i_err = cudaStreamSynchronize(stream); + if (cudaSuccess != amrex_i_err) { + std::string errStr(std::string("CUDA error from calling cudaStreamSynchronize ") + + std::to_string(amrex_i_err) + + std::string(" in file ") + __FILE__ + + ": " + cudaGetErrorString(amrex_i_err) + + "This is likely caused by an issue in a previous kernel launch " + + "such as amrex::ParallelFor"); + amrex::Abort(errStr); + } +#elif defined(AMREX_USE_HIP) + hipError_t amrex_i_err = hipStreamSynchronize(stream); + if (hipSuccess != amrex_i_err) { + std::string errStr(std::string("HIP error from calling hipStreamSynchronize") + + std::string(" in file ") + __FILE__ + + ": " + hipGetErrorString(amrex_i_err) + + "This is likely caused by an issue in a previous kernel launch " + + "such as amrex::ParallelFor"); + amrex::Abort(errStr); + } +#elif defined(AMREX_USE_SYCL) + auto& q = *(stream.queue); + try { + q.wait_and_throw(); + } catch (sycl::exception const& ex) { + std::string errStr(std::string("SYCL exception from calling queue.wait_and_throw") + + std::string(" in file ") + __FILE__ + + ": " + ex.what() + "!!!!!" + + "This is likely caused by an issue in a previous kernel launch " + + "such as amrex::ParallelFor"); + amrex::Abort(errStr); + } #endif } +#endif #if defined(__CUDACC__) && defined(AMREX_USE_CUDA) diff --git a/Src/Base/AMReX_GpuElixir.cpp b/Src/Base/AMReX_GpuElixir.cpp index f7c4757809..94450bc59b 100644 --- a/Src/Base/AMReX_GpuElixir.cpp +++ b/Src/Base/AMReX_GpuElixir.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include #include @@ -21,7 +22,12 @@ extern "C" { { auto p_pa = reinterpret_cast >*>(p); for (auto const& pa : *p_pa) { - pa.second->free(pa.first); + auto* carena = dynamic_cast(pa.second); + if (carena) { + carena->free_now(pa.first); + } else { + pa.second->free(pa.first); + } } delete p_pa; }