Skip to content

Commit 40a4a81

Browse files
devavretmeta-codesync[bot]
authored andcommitted
feat(cuDF): Add CudfExpression interface to allow nesting of cuDF AST and cuDF standalone functions (facebookincubator#15145)
Summary: This PR adds the following features in cuDF based expression evaluation. 1. `CudfExpression` class which is extended to implement cuDF based AST expression evaluation and standalone cuDF functions based evaluations. This allows for arbitrary nesting of AST and cudf functions. Previously only one level of nesting was supported and only AST could nest functions based evaluation. This class will be used in the future to add in JIT based evaluation without changing ast or function expressions. 2. Function registration: Now `CudfFunction`s can be registered without adding a corresponding entry in AST's pushExprToTree. 3. Expressions now return `ColumnOrView`. This puts the onus on the caller for materialization if the expression only refers to a field. This makes accessing nested fields zero-copy. 4. Expression evaluator registration: Now ASTExpression can be optionally not registered to force standalone function based evaluation. Registration also takes a priority value to prioritize one type of expression translation over the other. In case of duplicate functionality e.g when replacing a velox Expr arithmetic binary op, one can prioritize replacing with cudf AST over cudf::binary_operation This PR also refactors SubfieldFilterToAST converter and ASTExpression into separate source files. Contributes to facebookincubator#14149 Pull Request resolved: facebookincubator#15145 Reviewed By: kgpai Differential Revision: D85887084 Pulled By: kevinwilfong fbshipit-source-id: 4e0785e60b453a796cd1aed2e3c6eb08cac875ac
1 parent 883bdd4 commit 40a4a81

25 files changed

Lines changed: 2813 additions & 2119 deletions

velox/docs/configs.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,6 +1143,14 @@ Note: These configurations are experimental and subject to change.
11431143
- string
11441144
- ""
11451145
- The prefix to use for the function names in cuDF.
1146+
* - cudf.ast_expression_enabled
1147+
- bool
1148+
- true
1149+
- If true, enable using cuDF AST-based expression evaluation when supported.
1150+
* - cudf.ast_expression_priority
1151+
- integer
1152+
- 100
1153+
- Priority of cuDF AST expressions. Higher value wins when multiple cuDF execution options are available for the same Velox expression. Standalone cuDF functions have priority 50. If enabled, with a default priority of 100, AST will be chosen as replacement for cudf execution.
11461154
* - cudf.allow_cpu_fallback
11471155
- bool
11481156
- true

velox/experimental/cudf/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
add_subdirectory(exec)
1616
add_subdirectory(connectors)
1717
add_subdirectory(vector)
18+
add_subdirectory(expression)
1819

1920
if(VELOX_BUILD_TESTING)
2021
add_subdirectory(tests)

velox/experimental/cudf/CudfConfig.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ struct CudfConfig {
2929
static constexpr const char* kCudfMemoryPercent{"cudf.memory_percent"};
3030
static constexpr const char* kCudfFunctionNamePrefix{
3131
"cudf.function_name_prefix"};
32+
static constexpr const char* kCudfAstExpressionEnabled{
33+
"cudf.ast_expression_enabled"};
34+
static constexpr const char* kCudfAstExpressionPriority{
35+
"cudf.ast_expression_priority"};
3236
static constexpr const char* kCudfAllowCpuFallback{"cudf.allow_cpu_fallback"};
3337
static constexpr const char* kCudfLogFallback{"cudf.log_fallback"};
3438

@@ -60,6 +64,18 @@ struct CudfConfig {
6064
/// Register all the functions with the functionNamePrefix.
6165
std::string functionNamePrefix;
6266

67+
/// Enable AST in expression evaluation
68+
bool astExpressionEnabled{true};
69+
70+
/// Priority of AST expression. Expression with higher priority is chosen for
71+
/// a given root expression.
72+
/// Example:
73+
/// Priority of expression that uses individual cuDF functions is 50.
74+
/// If AST priority is 100 then for a velox expression node that is supported
75+
/// by both, AST will be chosen as replacement for cudf execution, if AST
76+
/// priority is 25 then standalone cudf function is chosen.
77+
int astExpressionPriority{100};
78+
6379
/// Whether to log a reason for falling back to Velox CPU execution.
6480
bool logFallback{true};
6581
};

velox/experimental/cudf/connectors/hive/CudfHiveDataSource.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
#include "velox/experimental/cudf/connectors/hive/CudfHiveConnectorSplit.h"
1919
#include "velox/experimental/cudf/connectors/hive/CudfHiveDataSource.h"
2020
#include "velox/experimental/cudf/connectors/hive/CudfHiveTableHandle.h"
21-
#include "velox/experimental/cudf/exec/ExpressionEvaluator.h"
2221
#include "velox/experimental/cudf/exec/ToCudf.h"
2322
#include "velox/experimental/cudf/exec/Utilities.h"
2423
#include "velox/experimental/cudf/exec/VeloxCudfInterop.h"
24+
#include "velox/experimental/cudf/expression/ExpressionEvaluator.h"
25+
#include "velox/experimental/cudf/expression/SubfieldFiltersToAst.h"
2526
#include "velox/experimental/cudf/vector/CudfVector.h"
2627

2728
#include "velox/common/time/Timer.h"
@@ -130,8 +131,8 @@ CudfHiveDataSource::CudfHiveDataSource(
130131
}
131132
}();
132133

133-
cudfExpressionEvaluator_ = velox::cudf_velox::ExpressionEvaluator(
134-
remainingFilterExprSet_->exprs(), remainingFilterType_);
134+
cudfExpressionEvaluator_ = velox::cudf_velox::createCudfExpression(
135+
remainingFilterExprSet_->exprs()[0], remainingFilterType_);
135136
// TODO(kn): Get column names and subfields from remaining filter and add to
136137
// readColumnNames_
137138
}
@@ -180,7 +181,7 @@ std::optional<RowVectorPtr> CudfHiveDataSource::next(
180181
const auto originalNumColumns = cudfTableColumns.size();
181182
// Filter may need addtional computed columns which are added to
182183
// cudfTableColumns
183-
auto filterResult = cudfExpressionEvaluator_.compute(
184+
auto filterResult = cudfExpressionEvaluator_->eval(
184185
cudfTableColumns, stream_, cudf::get_current_device_resource_ref());
185186
// discard computed columns
186187
std::vector<std::unique_ptr<cudf::column>> originalColumns;
@@ -194,7 +195,7 @@ std::optional<RowVectorPtr> CudfHiveDataSource::next(
194195
// Keep only rows where the filter is true
195196
cudfTable = cudf::apply_boolean_mask(
196197
*originalTable,
197-
*filterResult[0],
198+
asView(filterResult),
198199
stream_,
199200
cudf::get_current_device_resource_ref());
200201
}

velox/experimental/cudf/connectors/hive/CudfHiveDataSource.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
#include "velox/experimental/cudf/connectors/hive/CudfHiveConfig.h"
2020
#include "velox/experimental/cudf/connectors/hive/CudfHiveConnectorSplit.h"
21-
#include "velox/experimental/cudf/exec/ExpressionEvaluator.h"
2221
#include "velox/experimental/cudf/exec/NvtxHelper.h"
22+
#include "velox/experimental/cudf/expression/ExpressionEvaluator.h"
2323

2424
#include "velox/common/base/RandomUtil.h"
2525
#include "velox/common/io/IoStatistics.h"
@@ -128,7 +128,7 @@ class CudfHiveDataSource : public DataSource, public NvtxHelper {
128128
// Expression evaluator for remaining filter.
129129
core::ExpressionEvaluator* const expressionEvaluator_;
130130
std::unique_ptr<exec::ExprSet> remainingFilterExprSet_;
131-
velox::cudf_velox::ExpressionEvaluator cudfExpressionEvaluator_;
131+
std::shared_ptr<velox::cudf_velox::CudfExpression> cudfExpressionEvaluator_;
132132

133133
// Expression evaluator for subfield filter.
134134
std::vector<std::unique_ptr<cudf::scalar>> subfieldScalars_;

velox/experimental/cudf/exec/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ add_library(
2323
CudfLocalPartition.cpp
2424
CudfOrderBy.cpp
2525
DebugUtil.cpp
26-
ExpressionEvaluator.cpp
2726
ToCudf.cpp
2827
Utilities.cpp
2928
VeloxCudfInterop.cpp
@@ -40,6 +39,7 @@ target_link_libraries(
4039
velox_cudf_vector
4140
velox_exec
4241
velox_cudf_hive_connector
42+
velox_cudf_expression
4343
)
4444

4545
target_compile_options(velox_cudf_exec PRIVATE -Wno-missing-field-initializers)

velox/experimental/cudf/exec/CudfFilterProject.cpp

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -173,15 +173,26 @@ void CudfFilterProject::initialize() {
173173
++i;
174174
}
175175
}
176-
std::vector<std::shared_ptr<velox::exec::Expr>> projectExprs;
177176
if (hasFilter_) {
178-
filterEvaluator_ = ExpressionEvaluator({expr->exprs()[0]}, inputType);
179-
projectExprs = {expr->exprs().begin() + 1, expr->exprs().end()};
177+
// First expr is Filter, rest are Project
178+
filterEvaluator_ = createCudfExpression(expr->exprs()[0], inputType);
179+
std::transform(
180+
expr->exprs().begin() + 1,
181+
expr->exprs().end(),
182+
std::back_inserter(projectEvaluators_),
183+
[inputType](const auto& expr) {
184+
return createCudfExpression(expr, inputType);
185+
});
186+
} else {
187+
std::transform(
188+
expr->exprs().begin(),
189+
expr->exprs().end(),
190+
std::back_inserter(projectEvaluators_),
191+
[inputType](const auto& expr) {
192+
return createCudfExpression(expr, inputType);
193+
});
180194
}
181195

182-
projectEvaluator_ =
183-
ExpressionEvaluator(hasFilter_ ? projectExprs : expr->exprs(), inputType);
184-
185196
filter_.reset();
186197
project_.reset();
187198
}
@@ -233,16 +244,16 @@ void CudfFilterProject::filter(
233244
std::vector<std::unique_ptr<cudf::column>>& inputTableColumns,
234245
rmm::cuda_stream_view stream) {
235246
// Evaluate the Filter
236-
auto filterColumns = filterEvaluator_.compute(
237-
inputTableColumns, stream, cudf::get_current_device_resource_ref());
238-
auto filterColumn = filterColumns[0]->view();
247+
auto filterColumn = filterEvaluator_->eval(
248+
inputTableColumns, stream, cudf::get_current_device_resource_ref(), true);
249+
auto filterColumnView = asView(filterColumn);
239250
bool shouldApplyFilter = [&]() {
240-
if (filterColumn.has_nulls()) {
251+
if (filterColumnView.has_nulls()) {
241252
return true;
242253
}
243-
// check if all values in filterColumn are true
254+
// check if all values in filterColumnView are true
244255
auto isAllTrue = cudf::reduce(
245-
filterColumn,
256+
filterColumnView,
246257
*cudf::make_all_aggregation<cudf::reduce_aggregation>(),
247258
cudf::data_type(cudf::type_id::BOOL8),
248259
stream,
@@ -256,23 +267,39 @@ void CudfFilterProject::filter(
256267
auto filterTable =
257268
std::make_unique<cudf::table>(std::move(inputTableColumns));
258269
auto filteredTable =
259-
cudf::apply_boolean_mask(*filterTable, filterColumn, stream);
270+
cudf::apply_boolean_mask(*filterTable, filterColumnView, stream);
260271
inputTableColumns = filteredTable->release();
261272
}
262273
}
263274

264275
std::vector<std::unique_ptr<cudf::column>> CudfFilterProject::project(
265276
std::vector<std::unique_ptr<cudf::column>>& inputTableColumns,
266277
rmm::cuda_stream_view stream) {
267-
auto columns = projectEvaluator_.compute(
268-
inputTableColumns, stream, cudf::get_current_device_resource_ref());
278+
std::vector<ColumnOrView> columns;
279+
for (auto& projectEvaluator : projectEvaluators_) {
280+
columns.push_back(projectEvaluator->eval(
281+
inputTableColumns,
282+
stream,
283+
cudf::get_current_device_resource_ref(),
284+
true));
285+
}
269286

270287
// Rearrange columns to match outputType_
271288
std::vector<std::unique_ptr<cudf::column>> outputColumns(outputType_->size());
272289
// computed resultProjections
273290
for (int i = 0; i < resultProjections_.size(); i++) {
274-
VELOX_CHECK_NOT_NULL(columns[i]);
275-
outputColumns[resultProjections_[i].outputChannel] = std::move(columns[i]);
291+
auto& columnOrView = columns[i];
292+
if (std::holds_alternative<std::unique_ptr<cudf::column>>(columnOrView)) {
293+
// Move the owned column
294+
outputColumns[resultProjections_[i].outputChannel] =
295+
std::move(std::get<std::unique_ptr<cudf::column>>(columnOrView));
296+
} else {
297+
// Materialize the column_view into an owned column
298+
auto view = std::get<cudf::column_view>(columnOrView);
299+
outputColumns[resultProjections_[i].outputChannel] =
300+
std::make_unique<cudf::column>(
301+
view, stream, cudf::get_current_device_resource_ref());
302+
}
276303
}
277304

278305
// Count occurrences of each inputChannel, and move columns if they occur only
@@ -300,16 +327,6 @@ std::vector<std::unique_ptr<cudf::column>> CudfFilterProject::project(
300327
inputChannelCount[identity.inputChannel]--;
301328
}
302329

303-
for (auto i = 0; i < outputColumns.size(); ++i) {
304-
const auto cudfOutputType =
305-
cudf::data_type(cudf_velox::veloxToCudfTypeId(outputType_->childAt(i)));
306-
307-
if (outputColumns[i]->type() != cudfOutputType) {
308-
outputColumns[i] =
309-
cudf::cast(*(outputColumns[i]), cudfOutputType, stream);
310-
}
311-
}
312-
313330
return outputColumns;
314331
}
315332

velox/experimental/cudf/exec/CudfFilterProject.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
#pragma once
1818

19-
#include "velox/experimental/cudf/exec/ExpressionEvaluator.h"
2019
#include "velox/experimental/cudf/exec/NvtxHelper.h"
20+
#include "velox/experimental/cudf/expression/ExpressionEvaluator.h"
2121

2222
#include "velox/core/Expressions.h"
2323
#include "velox/core/PlanNode.h"
@@ -63,8 +63,8 @@ class CudfFilterProject : public exec::Operator, public NvtxHelper {
6363

6464
void close() override {
6565
Operator::close();
66-
projectEvaluator_.close();
67-
filterEvaluator_.close();
66+
projectEvaluators_.clear();
67+
filterEvaluator_.reset();
6868
}
6969

7070
private:
@@ -78,8 +78,8 @@ class CudfFilterProject : public exec::Operator, public NvtxHelper {
7878
std::shared_ptr<const core::ProjectNode> project_;
7979
std::shared_ptr<const core::FilterNode> filter_;
8080

81-
ExpressionEvaluator projectEvaluator_;
82-
ExpressionEvaluator filterEvaluator_;
81+
std::vector<CudfExpressionPtr> projectEvaluators_;
82+
CudfExpressionPtr filterEvaluator_;
8383

8484
std::vector<velox::exec::IdentityProjection> resultProjections_;
8585
std::vector<velox::exec::IdentityProjection> identityProjections_;

velox/experimental/cudf/exec/CudfHashJoin.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616

1717
#include "velox/experimental/cudf/CudfConfig.h"
1818
#include "velox/experimental/cudf/exec/CudfHashJoin.h"
19-
#include "velox/experimental/cudf/exec/ExpressionEvaluator.h"
2019
#include "velox/experimental/cudf/exec/ToCudf.h"
2120
#include "velox/experimental/cudf/exec/Utilities.h"
2221
#include "velox/experimental/cudf/exec/VeloxCudfInterop.h"
22+
#include "velox/experimental/cudf/expression/AstExpression.h"
23+
#include "velox/experimental/cudf/expression/ExpressionEvaluator.h"
2324

2425
#include "velox/exec/Task.h"
2526

0 commit comments

Comments
 (0)