Skip to content
This repository was archived by the owner on May 6, 2024. It is now read-only.

Commit b8fe9e8

Browse files
authored
[POAE7-2913] Vectorized Filter Support (#414)
* Remove column num function from C2R. * Add bit operations. * Support vectorized filter. * Fix format. * Fix broken UTs.
1 parent 0ad0de0 commit b8fe9e8

13 files changed

+349
-67
lines changed

cpp/src/cider/exec/nextgen/context/CodegenContext.h

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ struct CodegenOptions {
8686
jitlib::CompilationOptions co = jitlib::CompilationOptions{};
8787
};
8888

89+
struct FilterDescriptor {
90+
bool applied_filter_mask{false};
91+
jitlib::JITValuePointer filter_i64_mask{nullptr};
92+
jitlib::JITValuePointer start_index{nullptr};
93+
};
94+
8995
class CodegenContext {
9096
public:
9197
CodegenContext() : jit_func_(nullptr) {}
@@ -102,6 +108,33 @@ class CodegenContext {
102108
jit_module_ = std::move(jit_module);
103109
}
104110

111+
void setFilterMask(jitlib::JITValuePointer& mask,
112+
jitlib::JITValuePointer& start_index) {
113+
CHECK(mask->getValueTypeTag() == jitlib::JITTypeTag::INT64);
114+
CHECK(start_index->getValueTypeTag() == jitlib::JITTypeTag::INT64);
115+
116+
filter_desc_.applied_filter_mask = true;
117+
filter_desc_.filter_i64_mask.replace(mask);
118+
filter_desc_.start_index.replace(start_index);
119+
}
120+
121+
bool isAppliedFilterMask() { return filter_desc_.applied_filter_mask; }
122+
123+
std::pair<jitlib::JITValuePointer&, jitlib::JITValuePointer&> getFilterMask() {
124+
return {filter_desc_.filter_i64_mask, filter_desc_.start_index};
125+
}
126+
127+
template <typename FuncT>
128+
void appendDeferFunc(FuncT&& func) {
129+
defer_func_list_.emplace_back(func);
130+
}
131+
132+
void clearDeferFunc() { defer_func_list_.clear(); }
133+
134+
const std::vector<std::function<void()>>& getDeferFunc() const {
135+
return defer_func_list_;
136+
}
137+
105138
void setInputLength(jitlib::JITValuePointer& len) { input_len_.replace(len); }
106139

107140
jitlib::JITValuePointer& getInputLength() { return input_len_; }
@@ -269,7 +302,9 @@ class CodegenContext {
269302

270303
jitlib::JITFunctionPointer jit_func_;
271304
jitlib::JITModulePointer jit_module_;
272-
jitlib::JITValuePointer input_len_;
305+
jitlib::JITValuePointer input_len_{nullptr};
306+
FilterDescriptor filter_desc_;
307+
std::vector<std::function<void()>> defer_func_list_{};
273308

274309
int64_t id_counter_{0};
275310
CodegenOptions codegen_options_;

cpp/src/cider/exec/nextgen/operators/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ add_opnode(RowToColumnNode)
3030
add_opnode(HashJoinNode)
3131
add_opnode(VectorizedProjectNode)
3232
add_opnode(CrossJoinNode)
33+
add_opnode(VectorizedFilterNode)
3334

3435
list(APPEND OPERATORS_SOURCE
3536
${CMAKE_CURRENT_LIST_DIR}/extractor/AggExtractorBuilder.cpp)

cpp/src/cider/exec/nextgen/operators/ColumnToRowNode.cpp

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -229,25 +229,49 @@ void ColumnToRowTranslator::codegenImpl(SuccessorEmitter successor_wrapper,
229229

230230
// for row loop
231231
auto index = func->createVariable(JITTypeTag::INT64, "index", 0);
232-
auto len = context.getInputLength();
233-
static_cast<ColumnToRowNode*>(node_.get())->setColumnRowNum(len);
234-
235232
auto builder = func->createLoopBuilder();
236-
builder->condition([&index, &len]() { return index < len; })
237-
->loop([&](LoopBuilder*) {
238-
for (auto& input : inputs) {
239-
ColumnReader(context, input, index).read();
240-
}
241-
successor_wrapper(successor, context);
242-
})
243-
->update([&index]() { index = index + 1l; });
233+
234+
if (!context.isAppliedFilterMask()) {
235+
auto len = context.getInputLength();
236+
builder->condition([&index, &len]() { return index < len; })
237+
->loop([&](LoopBuilder*) {
238+
for (auto& input : inputs) {
239+
ColumnReader(context, input, index).read();
240+
}
241+
successor_wrapper(successor, context);
242+
})
243+
->update([&index]() { index = index + 1l; });
244+
} else {
245+
auto&& filter_mask = context.getFilterMask();
246+
builder->condition([&filter_mask]() { return filter_mask.first != 0; })
247+
->loop([&](LoopBuilder*) {
248+
auto offset = func->emitRuntimeFunctionCall(
249+
"get_lowest_set_bit",
250+
JITFunctionEmitDescriptor{.ret_type = JITTypeTag::INT64,
251+
.params_vector = {filter_mask.first.get()}});
252+
index = *filter_mask.second + offset;
253+
for (auto& input : inputs) {
254+
ColumnReader(context, input, index).read();
255+
}
256+
successor_wrapper(successor, context);
257+
})
258+
->update([&filter_mask, &func]() {
259+
filter_mask.first = func->emitRuntimeFunctionCall(
260+
"reset_tail_set_bit",
261+
JITFunctionEmitDescriptor{.ret_type = JITTypeTag::INT64,
262+
.params_vector = {filter_mask.first.get()}});
263+
});
264+
}
244265

245266
auto c2r_node = static_cast<ColumnToRowNode*>(node_.get());
246267
builder->setNoAlias(c2r_node->isVectorizable())->build();
247268

248269
// Execute defer build functions.
249-
for (auto& defer_func : c2r_node->getDeferFunctions()) {
250-
defer_func();
270+
if (!context.isAppliedFilterMask()) {
271+
for (auto& defer_func : context.getDeferFunc()) {
272+
defer_func();
273+
}
274+
context.clearDeferFunc();
251275
}
252276
}
253277
} // namespace cider::exec::nextgen::operators

cpp/src/cider/exec/nextgen/operators/ColumnToRowNode.h

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,6 @@ class ColumnToRowNode : public OpNode {
3737

3838
TranslatorPtr toTranslator(const TranslatorPtr& successor = nullptr) override;
3939

40-
jitlib::JITValuePointer& getColumnRowNum() { return column_row_num_; }
41-
42-
void setColumnRowNum(jitlib::JITValuePointer& row_num) {
43-
CHECK(column_row_num_.get() == nullptr);
44-
column_row_num_.replace(row_num);
45-
}
46-
47-
using DeferFunc = void (*)(void*);
48-
49-
template <typename FuncT>
50-
void registerDeferFunc(FuncT&& func) {
51-
defer_func_list_.emplace_back(func);
52-
}
53-
54-
std::vector<std::function<void()>>& getDeferFunctions() { return defer_func_list_; }
55-
5640
bool isVectorizable() const { return vectorizable_; }
5741

5842
private:

cpp/src/cider/exec/nextgen/operators/OperatorRuntimeFunctions.h

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
* under the License.
2020
*/
2121

22-
#ifndef NEXTGEN_CIDER_FUNCTION_RUNTIME_FUNCTIONS_H
23-
#define NEXTGEN_CIDER_FUNCTION_RUNTIME_FUNCTIONS_H
22+
#ifndef NEXTGEN_OPERATORS_OPERATORRUNTIMEFUNCTIONS_H
23+
#define NEXTGEN_OPERATORS_OPERATORRUNTIMEFUNCTIONS_H
2424

2525
#include "exec/nextgen/context/RuntimeContext.h"
2626
#include "type/data/funcannotations.h"
@@ -211,4 +211,20 @@ extern "C" NEVER_INLINE void convert_bool_to_bit(uint8_t* byte,
211211
CiderBitUtils::byteToBit(byte, bit, len);
212212
}
213213

214-
#endif // NEXTGEN_CIDER_FUNCTION_RUNTIME_FUNCTIONS_H
214+
extern "C" ALWAYS_INLINE size_t get_lowest_set_bit(size_t data) {
215+
return CiderBitUtils::countTailZero(data);
216+
}
217+
218+
extern "C" ALWAYS_INLINE size_t reset_tail_set_bit(size_t data) {
219+
return CiderBitUtils::setTailOneToZero(data);
220+
}
221+
222+
extern "C" ALWAYS_INLINE void reset_tail_bits_64_align(uint8_t* data, size_t len) {
223+
uint64_t* data_i64 = (uint64_t*)data;
224+
data_i64 += (len >> 6);
225+
size_t invalid_bits = 64 - (len & 63);
226+
uint64_t mask = ((0xFFFFFFFFFFFFFFFF << invalid_bits) >> invalid_bits);
227+
*data_i64 &= mask;
228+
}
229+
230+
#endif // NEXTGEN_OPERATORS_OPERATORRUNTIMEFUNCTIONS_H

cpp/src/cider/exec/nextgen/operators/RowToColumnNode.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -458,9 +458,7 @@ void RowToColumnTranslator::codegenImpl(SuccessorEmitter successor_wrapper,
458458

459459
auto output_index = func->createVariable(JITTypeTag::INT64, "output_index", 0);
460460

461-
// Get input ArrowArray length from previous C2RNode
462-
auto prev_c2r_node = static_cast<RowToColumnNode*>(node_.get())->getColumnToRowNode();
463-
auto input_array_len = prev_c2r_node->getColumnRowNum();
461+
auto input_array_len = context.getInputLength();
464462
bool bitwise_bool = static_cast<RowToColumnNode*>(node_.get())->writeBitwiseBool();
465463

466464
for (int64_t i = 0; i < exprs.size(); ++i) {
@@ -474,7 +472,7 @@ void RowToColumnTranslator::codegenImpl(SuccessorEmitter successor_wrapper,
474472
successor_wrapper(successor, context);
475473

476474
// Execute length field updating build function after C2R loop finished.
477-
prev_c2r_node->registerDeferFunc([output_index, &output_exprs, &context]() mutable {
475+
context.appendDeferFunc([output_index, &output_exprs, &context]() mutable {
478476
for (auto& expr : output_exprs) {
479477
size_t local_offset = expr->getLocalIndex();
480478
CHECK_NE(local_offset, 0);
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright(c) 2022-2023 Intel Corporation.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
#include "exec/nextgen/operators/VectorizedFilterNode.h"
22+
23+
#include <cstddef>
24+
#include <memory>
25+
26+
#include "exec/nextgen/context/CodegenContext.h"
27+
#include "exec/nextgen/jitlib/base/JITControlFlow.h"
28+
#include "exec/nextgen/jitlib/base/JITFunction.h"
29+
#include "exec/nextgen/jitlib/base/ValueTypes.h"
30+
#include "exec/nextgen/operators/OpNode.h"
31+
#include "exec/nextgen/operators/VectorizedProjectNode.h"
32+
#include "exec/nextgen/utils/JITExprValue.h"
33+
#include "type/data/sqltypes.h"
34+
#include "util/sqldefs.h"
35+
36+
namespace cider::exec::nextgen::operators {
37+
38+
using namespace context;
39+
using namespace jitlib;
40+
41+
TranslatorPtr VectorizedFilterNode::toTranslator(const TranslatorPtr& successor) {
42+
return createOpTranslator<VectorizedFilterTranslator>(shared_from_this(), successor);
43+
}
44+
45+
jitlib::JITValuePointer VectorizedFilterTranslator::generateFilterCondition(
46+
CodegenContext& context) {
47+
auto&& [_, exprs] = node_->getOutputExprs();
48+
// Combine conditions with AND.
49+
ExprPtr conditon = exprs.front();
50+
CHECK_EQ(conditon->get_type_info().get_type(), kBOOLEAN);
51+
for (size_t i = 1; i < exprs.size(); ++i) {
52+
CHECK_EQ(exprs[i]->get_type_info().get_type(), kBOOLEAN);
53+
conditon = std::make_shared<Analyzer::BinOper>(
54+
SQLTypeInfo(kBOOLEAN, !(conditon->getNullable() || exprs[i]->getNullable())),
55+
false,
56+
kAND,
57+
kONE,
58+
conditon,
59+
exprs[i]);
60+
}
61+
62+
// Register batch for the condition.
63+
auto condition_array = context.registerBatch(conditon->get_type_info(), "condition");
64+
conditon->setLocalIndex(context.appendArrowArrayValues(
65+
condition_array, utils::JITExprValue(0, JITExprValueType::BATCH)));
66+
67+
// Generate the condition.
68+
ExprPtrVector condition_target{conditon};
69+
auto vectorized_proj_node = createOpNode<VectorizedProjectNode>(condition_target);
70+
auto vectorized_proj_translator = vectorized_proj_node->toTranslator();
71+
vectorized_proj_translator->codegen(context, [](CodegenContext&) {});
72+
73+
// Remove null data
74+
// TODO bigPYJ1151): Support ISNULL NOTNULL vectorization.
75+
auto&& buffers = context.getArrowArrayValues(conditon->getLocalIndex()).second;
76+
utils::FixSizeJITExprValue bool_data(buffers);
77+
CHECK(bool_data.getValue().get());
78+
if (conditon->getNullable()) {
79+
CHECK(bool_data.getNull().get());
80+
context::codegen_utils::bitBufferAnd(bool_data.getValue(),
81+
bool_data.getValue(),
82+
bool_data.getNull(),
83+
context.getInputLength());
84+
}
85+
86+
// Reset tail bits.
87+
context.getJITFunction()->emitRuntimeFunctionCall(
88+
"reset_tail_bits_64_align",
89+
JITFunctionEmitDescriptor{
90+
.params_vector = {bool_data.getValue().get(), context.getInputLength().get()}});
91+
92+
return bool_data.getValue();
93+
}
94+
95+
void VectorizedFilterTranslator::consume(context::CodegenContext& context) {
96+
codegen(context, [this](context::CodegenContext& context) {
97+
if (successor_) {
98+
successor_->consume(context);
99+
}
100+
});
101+
}
102+
103+
void VectorizedFilterTranslator::codegenImpl(SuccessorEmitter successor_wrapper,
104+
context::CodegenContext& context,
105+
void* successor) {
106+
auto selected_row_mask = generateFilterCondition(context);
107+
108+
auto&& func = context.getJITFunction();
109+
auto loop_builder = func->createLoopBuilder();
110+
111+
auto mask_index = func->createVariable(JITTypeTag::INT64, "selected_row_mask_index", 0);
112+
auto row_index_start =
113+
func->createVariable(JITTypeTag::INT64, "selected_row_start_index", 0);
114+
auto upper = (context.getInputLength() + 63) / 64;
115+
116+
loop_builder->condition([&upper, &mask_index]() { return mask_index < upper; })
117+
->loop([&](LoopBuilder* builder) {
118+
auto&& selected_row_mask_i64 =
119+
selected_row_mask->castPointerSubType(JITTypeTag::INT64);
120+
auto current_selected_mask = selected_row_mask_i64[mask_index];
121+
122+
// Set current i64 selected mask.
123+
context.setFilterMask(current_selected_mask, row_index_start);
124+
125+
// All the rows are filtered.
126+
builder->loopContinue(current_selected_mask == 0);
127+
128+
// TODO (bigPYJ1151): Support full execution when no rows are filtered.
129+
// Row-based execution.
130+
successor_wrapper(successor, context);
131+
})
132+
->update([&mask_index, &row_index_start]() {
133+
mask_index = mask_index + 1;
134+
row_index_start = row_index_start + 64;
135+
})
136+
->build();
137+
138+
for (auto& defer_func : context.getDeferFunc()) {
139+
defer_func();
140+
}
141+
context.clearDeferFunc();
142+
}
143+
} // namespace cider::exec::nextgen::operators

0 commit comments

Comments
 (0)