Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support scan filter for ORC decimal reader #11067

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 142 additions & 12 deletions velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,17 @@ void SelectiveDecimalColumnReader<DataT>::seekToRowGroup(uint32_t index) {

template <typename DataT>
template <bool kDense>
void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
vector_size_t numRows = rows.back() + 1;
void SelectiveDecimalColumnReader<DataT>::readHelper(
common::Filter* filter,
RowSet rows) {
ExtractToReader extractValues(this);
common::AlwaysTrue filter;
common::AlwaysTrue alwaysTrue;
DirectRleColumnVisitor<
int64_t,
common::AlwaysTrue,
decltype(extractValues),
kDense>
visitor(filter, this, rows, extractValues);
visitor(alwaysTrue, this, rows, extractValues);

// decode scale stream
if (version_ == velox::dwrf::RleVersion_1) {
Expand All @@ -96,46 +97,175 @@ void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
// reset numValues_ before reading values
numValues_ = 0;
valueSize_ = sizeof(DataT);
vector_size_t numRows = rows.back() + 1;
ensureValuesCapacity<DataT>(numRows);

// decode value stream
facebook::velox::dwio::common::
ColumnVisitor<DataT, common::AlwaysTrue, decltype(extractValues), kDense>
valueVisitor(filter, this, rows, extractValues);
valueVisitor(alwaysTrue, this, rows, extractValues);
decodeWithVisitor<DirectDecoder<true>>(valueDecoder_.get(), valueVisitor);
readOffset_ += numRows;

// Fill decimals before applying filter.
fillDecimals();

const auto rawNulls = nullsInReadRange_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract this logic to a static function or trait class so that it can be later reused in parquet as well

? (kDense ? nullsInReadRange_->as<uint64_t>() : rawResultNulls_)
: nullptr;
// Treat the filter as kAlwaysTrue if any of the following conditions are met:
// 1) No filter found;
// 2) Filter is kIsNotNull but rawNulls==NULL (no elements is null).
auto filterKind =
!filter || (filter->kind() == common::FilterKind::kIsNotNull && !rawNulls)
? common::FilterKind::kAlwaysTrue
: filter->kind();
switch (filterKind) {
case common::FilterKind::kAlwaysTrue:
// Simply add all rows to output.
for (vector_size_t i = 0; i < numValues_; i++) {
addOutputRow(rows[i]);
}
break;
case common::FilterKind::kIsNull:
processNulls(true, rows, rawNulls);
break;
case common::FilterKind::kIsNotNull:
processNulls(false, rows, rawNulls);
break;
case common::FilterKind::kBigintRange:
case common::FilterKind::kBigintValuesUsingHashTable:
case common::FilterKind::kBigintValuesUsingBitmask:
case common::FilterKind::kNegatedBigintRange:
case common::FilterKind::kNegatedBigintValuesUsingHashTable:
case common::FilterKind::kNegatedBigintValuesUsingBitmask:
case common::FilterKind::kBigintMultiRange: {
if constexpr (std::is_same_v<DataT, int64_t>) {
processFilter(filter, rows, rawNulls);
} else {
VELOX_UNSUPPORTED("Unsupported filter: {}.", (int)filterKind);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be the case for schema evolution. Just throw VELOX_FAIL here or VELOX_NYI for these and add the requested type and file type information to the error message.

}
break;
}
case common::FilterKind::kHugeintValuesUsingHashTable:
case common::FilterKind::kHugeintRange: {
if constexpr (std::is_same_v<DataT, int128_t>) {
processFilter(filter, rows, rawNulls);
} else {
VELOX_UNSUPPORTED("Unsupported filter: {}.", (int)filterKind);
}
break;
}
default:
VELOX_UNSUPPORTED("Unsupported filter: {}.", (int)filterKind);
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::processNulls(
const bool isNull,
const RowSet rows,
const uint64_t* rawNulls) {
if (!rawNulls) {
return;
}
auto rawDecimal = values_->asMutable<DataT>();
auto rawScale = scaleBuffer_->asMutable<int64_t>();

returnReaderNulls_ = false;
anyNulls_ = !isNull;
allNull_ = isNull;
vector_size_t idx = 0;
for (vector_size_t i = 0; i < numValues_; i++) {
if (isNull) {
if (bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
idx++;
}
} else {
if (!bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx, false);
rawDecimal[idx] = rawDecimal[i];
rawScale[idx] = rawScale[i];
addOutputRow(rows[i]);
idx++;
}
}
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::processFilter(
const common::Filter* filter,
const RowSet rows,
const uint64_t* rawNulls) {
auto rawDecimal = values_->asMutable<DataT>();

returnReaderNulls_ = false;
anyNulls_ = false;
allNull_ = true;
vector_size_t idx = 0;
for (vector_size_t i = 0; i < numValues_; i++) {
if (rawNulls && bits::isBitNull(rawNulls, i)) {
if (filter->testNull()) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
anyNulls_ = true;
idx++;
}
} else {
bool tested;
if constexpr (std::is_same_v<DataT, int64_t>) {
tested = filter->testInt64(rawDecimal[i]);
} else {
tested = filter->testInt128(rawDecimal[i]);
}

if (tested) {
if (rawNulls) {
bits::setNull(rawResultNulls_, idx, false);
}
rawDecimal[idx] = rawDecimal[i];
addOutputRow(rows[i]);
allNull_ = false;
idx++;
}
}
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::read(
vector_size_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) {
VELOX_CHECK(!scanSpec_->filter());
VELOX_CHECK(!scanSpec_->valueHook());
prepareRead<int64_t>(offset, rows, incomingNulls);
bool isDense = rows.back() == rows.size() - 1;
if (isDense) {
readHelper<true>(rows);
readHelper<true>(scanSpec_->filter(), rows);
} else {
readHelper<false>(rows);
readHelper<false>(scanSpec_->filter(), rows);
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::getValues(
const RowSet& rows,
VectorPtr* result) {
rawValues_ = values_->asMutable<char>();
getIntValues(rows, requestedType_, result);
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::fillDecimals() {
auto nullsPtr =
resultNulls() ? resultNulls()->template as<uint64_t>() : nullptr;
auto scales = scaleBuffer_->as<int64_t>();
auto values = values_->asMutable<DataT>();

DecimalUtil::fillDecimals<DataT>(
values, nullsPtr, values, scales, numValues_, scale_);

rawValues_ = values_->asMutable<char>();
getIntValues(rows, requestedType_, result);
}

template class SelectiveDecimalColumnReader<int64_t>;
Expand Down
12 changes: 11 additions & 1 deletion velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,17 @@ class SelectiveDecimalColumnReader : public SelectiveColumnReader {

private:
template <bool kDense>
void readHelper(RowSet rows);
void readHelper(common::Filter* filter, RowSet rows);

void
processNulls(const bool isNull, const RowSet rows, const uint64_t* rawNulls);

void processFilter(
const common::Filter* filter,
const RowSet rows,
const uint64_t* rawNulls);

void fillDecimals();

std::unique_ptr<IntDecoder<true>> valueDecoder_;
std::unique_ptr<IntDecoder<true>> scaleDecoder_;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/AggregateSpillBenchmarkBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace facebook::velox::exec::test {
class AggregateSpillBenchmarkBase : public SpillerBenchmarkBase {
public:
explicit AggregateSpillBenchmarkBase(Spiller::Type spillerType)
: spillerType_(spillerType){};
: spillerType_(spillerType) {};

/// Sets up the test.
void setUp() override;
Expand Down
119 changes: 119 additions & 0 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/expression/ExprToSubfieldFilter.h"
#include "velox/functions/lib/IsNull.h"
#include "velox/type/Timestamp.h"
#include "velox/type/Type.h"
#include "velox/type/tests/SubfieldFiltersBuilder.h"
Expand Down Expand Up @@ -1779,6 +1780,124 @@ TEST_F(TableScanTest, validFileNoData) {
assertQuery(op, split, "");
}

TEST_F(TableScanTest, shortDecimalFilter) {
functions::registerIsNotNullFunction("isnotnull");

std::vector<std::optional<int64_t>> values = {
123456789123456789L,
987654321123456L,
std::nullopt,
2000000000000000L,
5000000000000000L,
987654321987654321L,
100000000000000L,
1230000000123456L,
120000000123456L,
std::nullopt};
auto rowVector = makeRowVector({
makeNullableFlatVector<int64_t>(values, DECIMAL(18, 6)),
});
createDuckDbTable({rowVector});

auto filePath = facebook::velox::test::getDataFilePath(
"velox/exec/tests", "data/short_decimal.orc");
auto split = HiveConnectorSplitBuilder(filePath)
.start(0)
.length(fs::file_size(filePath))
.fileFormat(dwio::common::FileFormat::ORC)
.build();

auto rowType = ROW({"d"}, {DECIMAL(18, 6)});

// Is not null.
auto op =
PlanBuilder().tableScan(rowType, {}, "isnotnull(d)", rowType).planNode();
assertQuery(op, split, "SELECT c0 FROM tmp where c0 is not null");

// Is null.
op = PlanBuilder().tableScan(rowType, {}, "is_null(d)", rowType).planNode();
assertQuery(op, split, "SELECT c0 FROM tmp where c0 is null");

// BigintRange.
op =
PlanBuilder()
.tableScan(
rowType,
{},
"d > 2000000000.0::DECIMAL(18, 6) and d < 6000000000.0::DECIMAL(18, 6)",
rowType)
.planNode();
assertQuery(
op,
split,
"SELECT c0 FROM tmp where c0 > 2000000000.0 and c0 < 6000000000.0");

// NegatedBigintRange.
op =
PlanBuilder()
.tableScan(
rowType,
{},
"not(d between 2000000000.0::DECIMAL(18, 6) and 6000000000.0::DECIMAL(18, 6))",
rowType)
.planNode();
assertQuery(
op,
split,
"SELECT c0 FROM tmp where c0 < 2000000000.0 or c0 > 6000000000.0");
}

TEST_F(TableScanTest, longDecimalFilter) {
functions::registerIsNotNullFunction("isnotnull");

std::vector<std::optional<int128_t>> values = {
HugeInt::parse("123456789123456789123456789" + std::string(9, '0')),
HugeInt::parse("987654321123456789" + std::string(9, '0')),
std::nullopt,
HugeInt::parse("2" + std::string(37, '0')),
HugeInt::parse("5" + std::string(37, '0')),
HugeInt::parse("987654321987654321987654321" + std::string(9, '0')),
HugeInt::parse("1" + std::string(26, '0')),
HugeInt::parse("123000000012345678" + std::string(10, '0')),
HugeInt::parse("120000000123456789" + std::string(9, '0')),
HugeInt::parse("9" + std::string(37, '0'))};
auto rowVector = makeRowVector({
makeNullableFlatVector<int128_t>(values, DECIMAL(38, 18)),
});
createDuckDbTable({rowVector});

auto filePath = facebook::velox::test::getDataFilePath(
"velox/exec/tests", "data/long_decimal.orc");
auto split = HiveConnectorSplitBuilder(filePath)
.start(0)
.length(fs::file_size(filePath))
.fileFormat(dwio::common::FileFormat::ORC)
.build();

auto rowType = ROW({"d"}, {DECIMAL(38, 18)});
auto op =
PlanBuilder().tableScan(rowType, {}, "isnotnull(d)", rowType).planNode();
assertQuery(op, split, "SELECT c0 FROM tmp where c0 is not null");

// Is null.
op = PlanBuilder().tableScan(rowType, {}, "is_null(d)", rowType).planNode();
assertQuery(op, split, "SELECT c0 FROM tmp where c0 is null");

// HugeintRange.
op =
PlanBuilder()
.tableScan(
rowType,
{},
"d > 2000000000.0::DECIMAL(38, 18) and d < 6000000000.0::DECIMAL(38, 18)",
rowType)
.planNode();
assertQuery(
op,
split,
"SELECT c0 FROM tmp where c0 > 2000000000.0 and c0 < 6000000000.0");
}

// An invalid (size = 0) file.
TEST_F(TableScanTest, emptyFile) {
auto filePath = TempFilePath::create();
Expand Down
Binary file added velox/exec/tests/data/long_decimal.orc
Binary file not shown.
Binary file added velox/exec/tests/data/short_decimal.orc
Binary file not shown.
7 changes: 7 additions & 0 deletions velox/expression/ExprToSubfieldFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,13 @@ std::unique_ptr<common::Filter> leafCallToSubfieldFilter(
}
return isNull();
}
} else if (call.name() == "isnotnull") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't IS NOT NULL parsed into not(is_null(...))?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Spark there is an expression 'IsNotNull' and it is frequently used in filter pushdown.This issue adds the background for this change: #11093. Thanks.

if (toSubfield(leftSide, subfield)) {
if (negated) {
return isNull();
}
return isNotNull();
}
}
return nullptr;
}
Expand Down
Loading