Skip to content

Commit 9f62073

Browse files
committed
csrc: fall back to per-call DMA on CUDA < 12.8
cudaMemcpyBatchAsync was introduced in CUDA 12.8 — guard the batch path with #if CUDA_VERSION >= 12080 and route to the per-call cudaMemcpyAsync loop below that. Default USE_BATCH_MEMCPY_* off on older toolchains so the env knob still makes sense. Also drop thread_local on the attrs/attrs_idx inputs (never mutated, no per-thread duplication needed) and move the copy_blocks dispatcher below the helpers it dispatches to.
1 parent 191f072 commit 9f62073

1 file changed

Lines changed: 42 additions & 27 deletions

File tree

kv_connectors/llmd_fs_backend/csrc/storage/tensor_copier.cu

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,16 @@ TensorCopier::TensorCopier(std::vector<torch::Tensor>& tensors,
4343
// Batched DMA is the default fast path on CUDA 12.8+; the per-call
4444
// cudaMemcpyAsync loop remains as a fallback when these flags are
4545
// explicitly set to 0 (older toolkits, debugging, A/B comparison).
46-
m_use_batch_memcpy_read = get_env_flag("USE_BATCH_MEMCPY_READ", true);
47-
m_use_batch_memcpy_write = get_env_flag("USE_BATCH_MEMCPY_WRITE", true);
46+
// cudaMemcpyBatchAsync was introduced in CUDA 12.8 — default off below that.
47+
#if CUDA_VERSION >= 12080
48+
constexpr bool kBatchDefault = true;
49+
#else
50+
constexpr bool kBatchDefault = false;
51+
#endif
52+
m_use_batch_memcpy_read =
53+
get_env_flag("USE_BATCH_MEMCPY_READ", kBatchDefault);
54+
m_use_batch_memcpy_write =
55+
get_env_flag("USE_BATCH_MEMCPY_WRITE", kBatchDefault);
4856
FS_LOG_INFO("TensorCopier: use_kernel_copy_read="
4957
<< m_use_kernel_copy_read
5058
<< ", use_kernel_copy_write=" << m_use_kernel_copy_write
@@ -103,26 +111,6 @@ void TensorCopier::copy_blocks_via_cuda_memcpy(
103111
}
104112
}
105113

106-
// Dispatches to one of three paths (priority: batch > kernel > memcpy):
107-
// - batch memcpy: one cudaMemcpyBatchAsync (CUDA 12.8+) for all
108-
// per-(block, layer) copies in this file.
109-
// - kernel copy: custom CUDA kernel doing the copies.
110-
// - memcpy loop: one cudaMemcpyAsync per (block, layer) (fallback).
111-
void TensorCopier::copy_blocks(uint8_t* cpu_base,
112-
const std::vector<int64_t>& block_ids_list,
113-
bool is_store) {
114-
bool use_batch =
115-
is_store ? m_use_batch_memcpy_write : m_use_batch_memcpy_read;
116-
bool use_kernel = is_store ? m_use_kernel_copy_write : m_use_kernel_copy_read;
117-
if (use_batch) {
118-
copy_blocks_via_batch_memcpy(cpu_base, block_ids_list, is_store);
119-
} else if (use_kernel) {
120-
copy_blocks_via_kernels(cpu_base, block_ids_list, is_store);
121-
} else {
122-
copy_blocks_via_cuda_memcpy(cpu_base, block_ids_list, is_store);
123-
}
124-
}
125-
126114
// Batched DMA path: one cudaMemcpyBatchAsync covers all per-(block, layer)
127115
// copies for the blocks in this file (num_blocks * num_tensors).
128116
// The batch executes in stream order; ordering within the batch is unspecified.
@@ -169,20 +157,23 @@ void TensorCopier::copy_blocks_via_batch_memcpy(
169157
}
170158
}
171159

160+
#if CUDA_VERSION >= 12080
172161
// Set attributes with srcAccessOrder=ANY (cudaMemcpySrcAccessOrderAny)
173162
// for malloc'd host staging buffer. Same as vLLM's cuda_mem_ops.py.
174-
thread_local cudaMemcpyAttributes attrs = [] {
163+
// static (not thread_local): never mutated, no per-thread duplication needed.
164+
// Not const: CUDA's C API takes non-const pointers.
165+
static cudaMemcpyAttributes attrs = [] {
175166
cudaMemcpyAttributes a{};
176167
a.srcAccessOrder = cudaMemcpySrcAccessOrderAny;
177168
return a;
178169
}();
179-
thread_local size_t attrs_idx = 0;
170+
static size_t attrs_idx = 0;
180171

181172
// Get current CUDA stream
182173
const auto stream = at::cuda::getCurrentCUDAStream();
183174

184175
// CUDA 13 dropped the failIdx out-param; CUDA 12.8/12.9 still requires it.
185-
#if CUDA_VERSION >= 13000
176+
#if CUDA_VERSION >= 13000
186177
cudaError_t err = cudaMemcpyBatchAsync(dsts.data(),
187178
srcs.data(),
188179
sizes.data(),
@@ -191,7 +182,7 @@ void TensorCopier::copy_blocks_via_batch_memcpy(
191182
&attrs_idx,
192183
/*numAttrs=*/1,
193184
stream.stream());
194-
#else
185+
#else
195186
static thread_local size_t fail_idx;
196187
cudaError_t err = cudaMemcpyBatchAsync(dsts.data(),
197188
srcs.data(),
@@ -202,8 +193,32 @@ void TensorCopier::copy_blocks_via_batch_memcpy(
202193
/*numAttrs=*/1,
203194
&fail_idx,
204195
stream.stream());
205-
#endif
196+
#endif
206197
TORCH_CHECK(err == cudaSuccess,
207198
"cudaMemcpyBatchAsync failed err=",
208199
cudaGetErrorString(err));
200+
#else
201+
// CUDA < 12.8: cudaMemcpyBatchAsync is not available — fall back.
202+
copy_blocks_via_cuda_memcpy(cpu_base, block_ids_list, is_store);
203+
#endif
209204
}
205+
206+
// Dispatches to one of three paths (priority: batch > kernel > memcpy):
207+
// - batch memcpy: one cudaMemcpyBatchAsync (CUDA 12.8+) for all
208+
// per-(block, layer) copies in this file.
209+
// - kernel copy: custom CUDA kernel doing the copies.
210+
// - memcpy loop: one cudaMemcpyAsync per (block, layer) (fallback).
211+
void TensorCopier::copy_blocks(uint8_t* cpu_base,
212+
const std::vector<int64_t>& block_ids_list,
213+
bool is_store) {
214+
bool use_batch =
215+
is_store ? m_use_batch_memcpy_write : m_use_batch_memcpy_read;
216+
bool use_kernel = is_store ? m_use_kernel_copy_write : m_use_kernel_copy_read;
217+
if (use_batch) {
218+
copy_blocks_via_batch_memcpy(cpu_base, block_ids_list, is_store);
219+
} else if (use_kernel) {
220+
copy_blocks_via_kernels(cpu_base, block_ids_list, is_store);
221+
} else {
222+
copy_blocks_via_cuda_memcpy(cpu_base, block_ids_list, is_store);
223+
}
224+
}

0 commit comments

Comments
 (0)