Skip to content

Commit 22fec44

Browse files
karthikeyannmeta-codesync[bot]
authored andcommitted
feat(cudf): Add round robin batch support for Local partition in Velox-cudf (facebookincubator#14664)
Summary: Round Robin batch partitioning can be supported with CPU LocalPartition operator itself. This PR adds changes to use LocalPartition only for Hash, and use LocalPartition for RoundRobin. Unit tests are added to test RoundRobin under difference configurations. Unit tests - roundRobinMultipleBatches - roundRobinEmptyInput - roundRobinMultipleSources - roundRobinWithAggregation - roundRobinWithTableScan - roundRobinAllCombinations This PR also has related fixes to CudfPlanBuilder after commit facebookincubator@31200dd which affects 3 function interfaces. Pull Request resolved: facebookincubator#14664 Reviewed By: Yuhta Differential Revision: D83667533 Pulled By: kgpai fbshipit-source-id: c8e555f81ee3a8fcd69d5f47c34d140238f894d3
1 parent 68ce938 commit 22fec44

File tree

4 files changed

+355
-4
lines changed

4 files changed

+355
-4
lines changed

velox/experimental/cudf/exec/CudfLocalPartition.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,23 @@
1717
#include "velox/experimental/cudf/exec/CudfLocalPartition.h"
1818
#include "velox/experimental/cudf/vector/CudfVector.h"
1919

20+
#include "velox/exec/HashPartitionFunction.h"
2021
#include "velox/exec/Task.h"
2122

2223
#include <cudf/copying.hpp>
2324
#include <cudf/partitioning.hpp>
2425

2526
namespace facebook::velox::cudf_velox {
2627

28+
bool CudfLocalPartition::shouldReplace(
29+
const std::shared_ptr<const core::LocalPartitionNode>& planNode) {
30+
auto* hashFunctionSpec = dynamic_cast<const exec::HashPartitionFunctionSpec*>(
31+
&planNode->partitionFunctionSpec());
32+
// Only replace LocalPartition with CudfLocalPartition for hash partitioning.
33+
// TODO: Round Robin Row-Wise Partitioning can be supported in future.
34+
return hashFunctionSpec;
35+
}
36+
2737
CudfLocalPartition::CudfLocalPartition(
2838
int32_t operatorId,
2939
exec::DriverCtx* ctx,
@@ -55,9 +65,11 @@ CudfLocalPartition::CudfLocalPartition(
5565

5666
// Get partition function specification string
5767
std::string spec = planNode->partitionFunctionSpec().toString();
68+
auto* hashFunctionSpec = dynamic_cast<const exec::HashPartitionFunctionSpec*>(
69+
&planNode->partitionFunctionSpec());
5870

5971
// Only parse keys if it's a hash function
60-
if (spec.find("HASH(") != std::string::npos) {
72+
if (hashFunctionSpec) {
6173
// Extract keys between HASH( and )
6274
size_t start = spec.find("HASH(") + 5;
6375
size_t end = spec.find(")", start);

velox/experimental/cudf/exec/CudfLocalPartition.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ class CudfLocalPartition : public exec::Operator, public NvtxHelper {
5151

5252
bool isFinished() override;
5353

54+
static bool shouldReplace(
55+
const std::shared_ptr<const core::LocalPartitionNode>& planNode);
56+
5457
protected:
5558
const std::vector<std::shared_ptr<exec::LocalExchangeQueue>> queues_;
5659
const size_t numPartitions_;

velox/experimental/cudf/exec/ToCudf.cpp

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,18 @@ bool CompileState::compile(bool force_replace) {
206206
id, planNode->outputType(), ctx, planNode->id() + "-from-velox"));
207207
replaceOp.back()->initialize();
208208
}
209+
if (not replaceOp.empty()) {
210+
// from-velox only, because need to inserted before current operator.
211+
operatorsOffset += replaceOp.size();
212+
[[maybe_unused]] auto replaced = driverFactory_.replaceOperators(
213+
driver_,
214+
replacingOperatorIndex,
215+
replacingOperatorIndex,
216+
std::move(replaceOp));
217+
replacingOperatorIndex = operatorIndex + operatorsOffset;
218+
replaceOp.clear();
219+
replacementsMade = true;
220+
}
209221

210222
// This is used to denote if the current operator is kept or replaced.
211223
auto keepOperator = 0;
@@ -273,9 +285,15 @@ bool CompileState::compile(bool force_replace) {
273285
auto planNode = std::dynamic_pointer_cast<const core::LocalPartitionNode>(
274286
getPlanNode(localPartitionOp->planNodeId()));
275287
VELOX_CHECK(planNode != nullptr);
276-
replaceOp.push_back(
277-
std::make_unique<CudfLocalPartition>(id, ctx, planNode));
278-
replaceOp.back()->initialize();
288+
if (CudfLocalPartition::shouldReplace(planNode)) {
289+
replaceOp.push_back(
290+
std::make_unique<CudfLocalPartition>(id, ctx, planNode));
291+
replaceOp.back()->initialize();
292+
} else {
293+
// Round Robin batch-wise Partitioning is supported by CPU operator with
294+
// GPU Vector.
295+
keepOperator = 1;
296+
}
279297
} else if (
280298
auto localExchangeOp = dynamic_cast<exec::LocalExchange*>(oper)) {
281299
keepOperator = 1;
@@ -319,6 +337,7 @@ bool CompileState::compile(bool force_replace) {
319337
}
320338

321339
if (not replaceOp.empty()) {
340+
// ReplaceOp, to-velox.
322341
operatorsOffset += replaceOp.size() - 1 + keepOperator;
323342
[[maybe_unused]] auto replaced = driverFactory_.replaceOperators(
324343
driver_,

0 commit comments

Comments
 (0)