|
26 | 26 | #include "velox/exec/Operator.h" |
27 | 27 | #include "velox/vector/ComplexVector.h" |
28 | 28 |
|
29 | | -#include <cudf/copying.hpp> |
30 | | -#include <cudf/table/table.hpp> |
| 29 | +#include <cudf/types.hpp> |
31 | 30 | #include <cudf/utilities/default_stream.hpp> |
32 | 31 |
|
33 | 32 | namespace facebook::velox::cudf_velox { |
@@ -204,116 +203,136 @@ std::optional<uint64_t> CudfToVelox::averageRowSize() { |
204 | 203 | return averageRowSize_; |
205 | 204 | } |
206 | 205 |
|
| 206 | +// Pop inputs_.front(), convert its GPU table to a Velox RowVector via a |
| 207 | +// single to_arrow_host + synchronize, and return it. The caller is |
| 208 | +// responsible for any further slicing. |
| 209 | +RowVectorPtr CudfToVelox::convertFrontToVelox() { |
| 210 | + auto cudfVector = std::move(inputs_.front()); |
| 211 | + inputs_.pop_front(); |
| 212 | + auto stream = cudfVector->stream(); |
| 213 | + auto tableView = cudfVector->getTableView(); |
| 214 | + auto output = with_arrow::toVeloxColumn( |
| 215 | + tableView, pool(), outputType_, "", stream, get_temp_mr()); |
| 216 | + stream.synchronize(); |
| 217 | + output->setType(outputType_); |
| 218 | + return output; |
| 219 | +} |
| 220 | + |
| 221 | +// Output batching strategy |
| 222 | +// ======================== |
| 223 | +// The key constraint is minimising D->H (device-to-host) transfers. |
| 224 | +// Each call to toVeloxColumn / to_arrow_host triggers one D->H copy per |
| 225 | +// column, so calling it once per output batch (rather than once per row |
| 226 | +// or once per input batch) is critical for performance. |
| 227 | +// |
| 228 | +// Two cases arise depending on the size of the front GPU input relative |
| 229 | +// to targetBatchSize: |
| 230 | +// |
| 231 | +// (A) Front input >= targetBatchSize (e.g. CudfOrderBy: one large sorted |
| 232 | +// table). We convert the whole input to Velox in one shot and then |
| 233 | +// slice it purely on the CPU using BaseVector::slice(). Subsequent |
| 234 | +// getOutput() calls return successive CPU slices with no additional |
| 235 | +// D->H work until veloxBuffer_ is exhausted. |
| 236 | +// |
| 237 | +// (B) Front input < targetBatchSize (e.g. CudfFilterProject with high |
| 238 | +// selectivity: many small GPU batches). We concatenate inputs on device |
| 239 | +// until we accumulate targetBatchSize rows, then convert the concat |
| 240 | +// result to Velox in one shot. This preserves the GPU-side merge |
| 241 | +// that avoids emitting many undersized Velox batches downstream. |
| 242 | +// |
| 243 | +// In both cases exactly one toVeloxColumn + stream.synchronize() is issued |
| 244 | +// per output batch, regardless of how many GPU inputs were consumed. |
207 | 245 | RowVectorPtr CudfToVelox::getOutput() { |
208 | 246 | VELOX_NVTX_OPERATOR_FUNC_RANGE(); |
209 | | - if (finished_ || inputs_.empty()) { |
210 | | - finished_ = noMoreInput_ && inputs_.empty(); |
| 247 | + if (finished_) { |
211 | 248 | return nullptr; |
212 | 249 | } |
213 | 250 |
|
214 | | - // Get the target batch size |
215 | | - const auto targetBatchSize = outputBatchRows(averageRowSize()); |
216 | | - |
217 | | - // Process single input directly in these cases: |
218 | | - // 1. In passthrough mode |
219 | | - // 2. If we only have one input and it's smaller than or equal to the target |
220 | | - // batch size |
221 | | - if (isPassthroughMode() || |
222 | | - (inputs_.size() == 1 && inputs_.front()->size() <= targetBatchSize)) { |
223 | | - // Move the CudfVector out to keep it alive while we use the view. |
224 | | - // This avoids expensive materialization when constructed from packed_table. |
225 | | - auto cudfVector = std::move(inputs_.front()); |
226 | | - inputs_.pop_front(); |
227 | | - |
228 | | - auto tableView = cudfVector->getTableView(); |
229 | | - auto stream = cudfVector->stream(); |
230 | | - if (tableView.num_rows() == 0) { |
231 | | - finished_ = noMoreInput_ && inputs_.empty(); |
| 251 | + // Drain veloxBuffer_ (populated on a previous call) before consuming |
| 252 | + // more GPU inputs. |
| 253 | + if (!veloxBuffer_) { |
| 254 | + if (inputs_.empty()) { |
| 255 | + finished_ = noMoreInput_; |
232 | 256 | return nullptr; |
233 | 257 | } |
234 | | - RowVectorPtr output = with_arrow::toVeloxColumn( |
235 | | - tableView, pool(), outputType_, "", stream, get_temp_mr()); |
236 | | - stream.synchronize(); |
237 | | - finished_ = noMoreInput_ && inputs_.empty(); |
238 | | - output->setType(outputType_); |
239 | | - // cudfVector goes out of scope here, freeing the GPU memory |
240 | | - return output; |
241 | | - } |
242 | 258 |
|
243 | | - // Calculate how many tables we need to concatenate to reach the target batch |
244 | | - // size and collect them in a vector |
245 | | - std::vector<CudfVectorPtr> selectedInputs; |
246 | | - vector_size_t totalSize = 0; |
| 259 | + // Passthrough mode: emit each GPU input as a single Velox batch with no |
| 260 | + // re-batching. Used when the caller knows the batch size is already |
| 261 | + // correct (e.g. default pipeline without explicit batch-size overrides). |
| 262 | + if (isPassthroughMode()) { |
| 263 | + auto output = convertFrontToVelox(); |
| 264 | + finished_ = noMoreInput_ && inputs_.empty(); |
| 265 | + if (output->size() == 0) { |
| 266 | + return nullptr; |
| 267 | + } |
| 268 | + return output; |
| 269 | + } |
247 | 270 |
|
248 | | - while (!inputs_.empty() && totalSize < targetBatchSize) { |
249 | | - auto& input = inputs_.front(); |
250 | | - if (totalSize + input->size() <= targetBatchSize) { |
251 | | - totalSize += input->size(); |
252 | | - selectedInputs.push_back(std::move(input)); |
253 | | - inputs_.pop_front(); |
| 271 | + const auto targetBatchSize = outputBatchRows(averageRowSize()); |
| 272 | + |
| 273 | + if (static_cast<vector_size_t>(inputs_.front()->size()) >= |
| 274 | + targetBatchSize) { |
| 275 | + // Case A: large input. Convert once; subsequent calls slice CPU-side. |
| 276 | + veloxBuffer_ = convertFrontToVelox(); |
| 277 | + veloxOffset_ = 0; |
| 278 | + averageRowSize_ = std::nullopt; // recompute from next input |
254 | 279 | } else { |
255 | | - // If the next input would exceed targetBatchSize, |
256 | | - // we need to split it and only take what we need |
257 | | - auto cudfTableView = input->getTableView(); |
258 | | - auto stream = input->stream(); |
259 | | - auto partitions = std::vector<cudf::size_type>{ |
260 | | - static_cast<cudf::size_type>(targetBatchSize - totalSize)}; |
261 | | - auto tableSplits = cudf::split(cudfTableView, partitions, stream); |
262 | | - |
263 | | - // Create new CudfVector from the first part |
264 | | - auto firstPart = |
265 | | - std::make_unique<cudf::table>(tableSplits[0], stream, get_temp_mr()); |
266 | | - auto firstPartSize = firstPart->num_rows(); |
267 | | - auto firstPartVector = std::make_shared<CudfVector>( |
268 | | - pool(), input->type(), firstPartSize, std::move(firstPart), stream); |
269 | | - |
270 | | - // Create new CudfVector from the second part |
271 | | - auto secondPart = |
272 | | - std::make_unique<cudf::table>(tableSplits[1], stream, get_temp_mr()); |
273 | | - auto secondPartSize = secondPart->num_rows(); |
274 | | - auto secondPartVector = std::make_shared<CudfVector>( |
275 | | - pool(), input->type(), secondPartSize, std::move(secondPart), stream); |
276 | | - |
277 | | - // Replace the original input with the second part |
278 | | - input = std::move(secondPartVector); |
279 | | - |
280 | | - // Add the first part to selectedInputs |
281 | | - selectedInputs.push_back(std::move(firstPartVector)); |
282 | | - totalSize += firstPartSize; |
283 | | - break; |
| 280 | + // Case B: small inputs. GPU-concat until we reach targetBatchSize, |
| 281 | + // then convert the merged table in one D->H transfer. |
| 282 | + auto stream = inputs_.front()->stream(); |
| 283 | + std::vector<CudfVectorPtr> toConcat; |
| 284 | + vector_size_t accumulated = 0; |
| 285 | + while (!inputs_.empty() && accumulated < targetBatchSize) { |
| 286 | + accumulated += static_cast<vector_size_t>(inputs_.front()->size()); |
| 287 | + toConcat.push_back(std::move(inputs_.front())); |
| 288 | + inputs_.pop_front(); |
| 289 | + } |
| 290 | + VELOX_CHECK_LE( |
| 291 | + accumulated, |
| 292 | + std::numeric_limits<cudf::size_type>::max(), |
| 293 | + "Accumulated row count exceeds cudf int32 limit"); |
| 294 | + auto concatTable = getConcatenatedTable( |
| 295 | + std::move(toConcat), outputType_, stream, get_temp_mr()); |
| 296 | + auto tableView = concatTable->view(); |
| 297 | + veloxBuffer_ = with_arrow::toVeloxColumn( |
| 298 | + tableView, pool(), outputType_, "", stream, get_temp_mr()); |
| 299 | + stream.synchronize(); |
| 300 | + veloxBuffer_->setType(outputType_); |
| 301 | + veloxOffset_ = 0; |
| 302 | + averageRowSize_ = std::nullopt; |
284 | 303 | } |
285 | 304 | } |
286 | 305 |
|
287 | | - finished_ = noMoreInput_ && inputs_.empty(); |
288 | | - |
289 | | - // If we have no inputs to process, return nullptr |
290 | | - if (selectedInputs.empty()) { |
| 306 | + // Slice veloxBuffer_ on the CPU to produce the next output batch. |
| 307 | + const auto totalRows = static_cast<vector_size_t>(veloxBuffer_->size()); |
| 308 | + if (veloxOffset_ >= totalRows) { |
| 309 | + veloxBuffer_.reset(); |
| 310 | + finished_ = noMoreInput_ && inputs_.empty(); |
291 | 311 | return nullptr; |
292 | 312 | } |
293 | 313 |
|
294 | | - // Concatenate the selected tables on the GPU |
295 | | - auto stream = cudfGlobalStreamPool().get_stream(); |
296 | | - auto resultTable = getConcatenatedTable( |
297 | | - std::move(selectedInputs), outputType_, stream, get_temp_mr()); |
| 314 | + const auto targetBatchSize = outputBatchRows( |
| 315 | + veloxBuffer_->estimateFlatSize() / |
| 316 | + static_cast<uint64_t>(std::max<vector_size_t>(totalRows, 1))); |
| 317 | + const auto take = std::min(targetBatchSize, totalRows - veloxOffset_); |
298 | 318 |
|
299 | | - // Convert the concatenated table to a RowVector |
300 | | - const auto size = resultTable->num_rows(); |
301 | | - VELOX_CHECK_NOT_NULL(resultTable); |
302 | | - if (size == 0) { |
303 | | - return nullptr; |
| 319 | + auto slice = std::dynamic_pointer_cast<RowVector>( |
| 320 | + veloxBuffer_->slice(veloxOffset_, take)); |
| 321 | + VELOX_CHECK_NOT_NULL(slice); |
| 322 | + veloxOffset_ += take; |
| 323 | + |
| 324 | + if (veloxOffset_ >= totalRows) { |
| 325 | + veloxBuffer_.reset(); |
| 326 | + finished_ = noMoreInput_ && inputs_.empty(); |
304 | 327 | } |
305 | 328 |
|
306 | | - RowVectorPtr output = with_arrow::toVeloxColumn( |
307 | | - resultTable->view(), pool(), outputType_, "", stream, get_temp_mr()); |
308 | | - stream.synchronize(); |
309 | | - finished_ = noMoreInput_ && inputs_.empty(); |
310 | | - output->setType(outputType_); |
311 | | - return output; |
| 329 | + return slice; |
312 | 330 | } |
313 | 331 |
|
314 | 332 | void CudfToVelox::close() { |
315 | 333 | exec::Operator::close(); |
316 | 334 | inputs_.clear(); |
| 335 | + veloxBuffer_.reset(); |
317 | 336 | } |
318 | 337 |
|
319 | 338 | } // namespace facebook::velox::cudf_velox |
0 commit comments