Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,17 @@ std::vector<TypePtr> SplitReader::adaptColumns(
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
if (!fileTypeIdx.has_value()) {
// Column is missing. Most likely due to schema evolution.
VELOX_CHECK(tableSchema, "Unable to resolve column '{}'", fieldName);
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
TypePtr fieldType;
if (outputTypeIdx.has_value()) {
// Field name exists in the user-specified output type.
fieldType = readerOutputType_->childAt(outputTypeIdx.value());
} else {
VELOX_CHECK(tableSchema, "Unable to resolve column '{}'", fieldName);
fieldType = tableSchema->findChild(fieldName);
}
childSpec->setConstantValue(BaseVector::createNullConstant(
tableSchema->findChild(fieldName),
1,
connectorQueryCtx_->memoryPool()));
fieldType, 1, connectorQueryCtx_->memoryPool()));
} else {
// Column no longer missing, reset constant value set on the spec.
childSpec->setConstantValue(nullptr);
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/ScanSpec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ bool ScanSpec::hasFilter() const {
if (hasFilter_.has_value()) {
return hasFilter_.value();
}
if (!isConstant() && filter()) {
if (filter()) {
hasFilter_ = true;
return true;
}
Expand Down
11 changes: 6 additions & 5 deletions velox/dwio/common/SelectiveStructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ void SelectiveStructColumnReaderBase::read(
}

const auto& childSpecs = scanSpec_->children();
VELOX_CHECK(!childSpecs.empty());
for (size_t i = 0; i < childSpecs.size(); ++i) {
const auto& childSpec = childSpecs[i];
VELOX_TRACE_HISTORY_PUSH("read %s", childSpec->fieldName().c_str());
Expand Down Expand Up @@ -462,15 +461,17 @@ bool SelectiveStructColumnReaderBase::isChildMissing(
// row type that doesn't exist
// in the output.
fileType_->type()->kind() !=
TypeKind::MAP && // If this is the case it means this is a flat map,
// so it can't have "missing" fields.
childSpec.channel() >= fileType_->size());
TypeKind::MAP // If this is the case it means this is a flat map,
// so it can't have "missing" fields.
) &&
(useColumnNames_
? !asRowType(fileType_->type())->containsChild(childSpec.fieldName())
: childSpec.channel() >= fileType_->size());
}

void SelectiveStructColumnReaderBase::getValues(
const RowSet& rows,
VectorPtr* result) {
VELOX_CHECK(!scanSpec_->children().empty());
VELOX_CHECK_NOT_NULL(
*result, "SelectiveStructColumnReaderBase expects a non-null result");
VELOX_CHECK(
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/common/SelectiveStructColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,13 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
FormatParams& params,
velox::common::ScanSpec& scanSpec,
bool useColumnNames,
bool isRoot = false)
: SelectiveColumnReader(requestedType, fileType, params, scanSpec),
debugString_(
getExceptionContext().message(VeloxException::Type::kSystem)),
isRoot_(isRoot),
useColumnNames_(useColumnNames),
rows_(memoryPool_) {}

bool hasDeletion() const final {
Expand Down Expand Up @@ -164,6 +166,9 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
// table.
const bool isRoot_;

// Whether to use names for mapping table field names to file field names.
const bool useColumnNames_;

// Dense set of rows to read in next().
raw_vector<vector_size_t> rows_;

Expand Down
13 changes: 10 additions & 3 deletions velox/dwio/dwrf/reader/DwrfReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class DwrfUnit : public LoadUnit {
uint32_t stripeIndex,
std::shared_ptr<dwio::common::ColumnSelector> columnSelector,
const std::shared_ptr<BitSet>& projectedNodes,
RowReaderOptions options)
RowReaderOptions options,
bool useColumnNames)
: stripeReaderBase_{stripeReaderBase},
strideIndexProvider_{strideIndexProvider},
columnReaderStatistics_{&columnReaderStatistics},
Expand All @@ -53,7 +54,8 @@ class DwrfUnit : public LoadUnit {
projectedNodes_{projectedNodes},
options_{std::move(options)},
stripeInfo_{
stripeReaderBase.getReader().footer().stripes(stripeIndex_)} {}
stripeReaderBase.getReader().footer().stripes(stripeIndex_)},
useColumnNames_{useColumnNames} {}

~DwrfUnit() override = default;

Expand Down Expand Up @@ -92,6 +94,9 @@ class DwrfUnit : public LoadUnit {
const RowReaderOptions options_;
const StripeInformationWrapper stripeInfo_;

// Whether to use names for mapping table field names to file field names.
const bool useColumnNames_;

// Mutables
bool preloaded_;
std::optional<uint64_t> cachedIoSize_;
Expand Down Expand Up @@ -166,6 +171,7 @@ void DwrfUnit::ensureDecoders() {
streamLabels,
*columnReaderStatistics_,
scanSpec,
useColumnNames_,
flatMapContext,
/*isRoot=*/true);
selectiveColumnReader_->setIsTopLevel();
Expand Down Expand Up @@ -328,7 +334,8 @@ std::unique_ptr<dwio::common::UnitLoader> DwrfRowReader::getUnitLoader() {
stripe,
columnSelector_,
projectedNodes_,
options_));
options_,
readerBaseShared()->readerOptions().useColumnNamesForColumnMapping()));
}
std::shared_ptr<UnitLoaderFactory> unitLoaderFactory =
options_.unitLoaderFactory();
Expand Down
14 changes: 10 additions & 4 deletions velox/dwio/dwrf/reader/ReaderBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,19 @@ std::shared_ptr<const Type> ReaderBase::convertType(
return SMALLINT();
case TypeKind::INTEGER:
return INTEGER();
case TypeKind::BIGINT:
case TypeKind::BIGINT: {
TypePtr converted;
if (type.format() == DwrfFormat::kOrc &&
type.getOrcPtr()->kind() == proto::orc::Type_Kind_DECIMAL) {
return DECIMAL(
type.getOrcPtr()->precision(), type.getOrcPtr()->scale());
converted =
DECIMAL(type.getOrcPtr()->precision(), type.getOrcPtr()->scale());
} else {
converted = BIGINT();
common::testutil::TestValue::adjust(
"facebook::velox::dwrf::ReaderBase::convertType", &converted);
}
return BIGINT();
return converted;
}
case TypeKind::HUGEINT:
if (type.format() == DwrfFormat::kOrc &&
type.getOrcPtr()->kind() == proto::orc::Type_Kind_DECIMAL) {
Expand Down
180 changes: 168 additions & 12 deletions velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,17 @@ void SelectiveDecimalColumnReader<DataT>::seekToRowGroup(int64_t index) {

template <typename DataT>
template <bool kDense>
void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
vector_size_t numRows = rows.back() + 1;
void SelectiveDecimalColumnReader<DataT>::readHelper(
common::Filter* filter,
RowSet rows) {
ExtractToReader extractValues(this);
common::AlwaysTrue filter;
common::AlwaysTrue alwaysTrue;
DirectRleColumnVisitor<
int64_t,
common::AlwaysTrue,
decltype(extractValues),
kDense>
visitor(filter, this, rows, extractValues);
visitor(alwaysTrue, this, rows, extractValues);

// decode scale stream
if (version_ == velox::dwrf::RleVersion_1) {
Expand All @@ -104,46 +105,201 @@ void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
// reset numValues_ before reading values
numValues_ = 0;
valueSize_ = sizeof(DataT);
vector_size_t numRows = rows.back() + 1;
ensureValuesCapacity<DataT>(numRows);

// decode value stream
facebook::velox::dwio::common::
ColumnVisitor<DataT, common::AlwaysTrue, decltype(extractValues), kDense>
valueVisitor(filter, this, rows, extractValues);
valueVisitor(alwaysTrue, this, rows, extractValues);
decodeWithVisitor<DirectDecoder<true>>(valueDecoder_.get(), valueVisitor);
readOffset_ += numRows;

// Fill decimals before applying filter.
fillDecimals();

const auto rawNulls = nullsInReadRange_
? (kDense ? nullsInReadRange_->as<uint64_t>() : rawResultNulls_)
: nullptr;
// Process filter.
process(filter, rows, rawNulls);
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::processNulls(
bool isNull,
const RowSet& rows,
const uint64_t* rawNulls) {
if (!rawNulls) {
return;
}
returnReaderNulls_ = false;
anyNulls_ = !isNull;
allNull_ = isNull;

auto rawDecimal = values_->asMutable<DataT>();
auto rawScale = scaleBuffer_->asMutable<int64_t>();

vector_size_t idx = 0;
if (isNull) {
for (vector_size_t i = 0; i < numValues_; i++) {
if (bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
idx++;
}
}
} else {
for (vector_size_t i = 0; i < numValues_; i++) {
if (!bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx, false);
rawDecimal[idx] = rawDecimal[i];
rawScale[idx] = rawScale[i];
addOutputRow(rows[i]);
idx++;
}
}
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::processFilter(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls) {
VELOX_CHECK_NOT_NULL(filter, "Filter must not be null.");
returnReaderNulls_ = false;
anyNulls_ = false;
allNull_ = true;

vector_size_t idx = 0;
auto rawDecimal = values_->asMutable<DataT>();
for (vector_size_t i = 0; i < numValues_; i++) {
if (rawNulls && bits::isBitNull(rawNulls, i)) {
if (filter->testNull()) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
anyNulls_ = true;
idx++;
}
} else {
bool tested;
if constexpr (std::is_same_v<DataT, int64_t>) {
tested = filter->testInt64(rawDecimal[i]);
} else {
tested = filter->testInt128(rawDecimal[i]);
}

if (tested) {
if (rawNulls) {
bits::setNull(rawResultNulls_, idx, false);
}
rawDecimal[idx] = rawDecimal[i];
addOutputRow(rows[i]);
allNull_ = false;
idx++;
}
}
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::process(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls) {
// Treat the filter as kAlwaysTrue if any of the following conditions are met:
// 1) No filter found;
// 2) Filter is kIsNotNull but rawNulls == NULL (no elements is null).
auto filterKind =
!filter || (filter->kind() == common::FilterKind::kIsNotNull && !rawNulls)
? common::FilterKind::kAlwaysTrue
: filter->kind();
switch (filterKind) {
case common::FilterKind::kAlwaysTrue:
// Simply add all rows to output.
for (vector_size_t i = 0; i < numValues_; i++) {
addOutputRow(rows[i]);
}
break;
case common::FilterKind::kIsNull:
processNulls(true, rows, rawNulls);
break;
case common::FilterKind::kIsNotNull:
processNulls(false, rows, rawNulls);
break;
case common::FilterKind::kBigintRange:
case common::FilterKind::kBigintValuesUsingHashTable:
case common::FilterKind::kBigintValuesUsingBitmask:
case common::FilterKind::kNegatedBigintRange:
case common::FilterKind::kNegatedBigintValuesUsingHashTable:
case common::FilterKind::kNegatedBigintValuesUsingBitmask:
case common::FilterKind::kBigintMultiRange: {
if constexpr (std::is_same_v<DataT, int64_t>) {
processFilter(filter, rows, rawNulls);
} else {
const auto actualType = CppToType<DataT>::create();
VELOX_NYI(
"Expected type BIGINT, but found file type {}.",
actualType->toString());
}
break;
}
case common::FilterKind::kHugeintValuesUsingHashTable:
case common::FilterKind::kHugeintRange: {
if constexpr (std::is_same_v<DataT, int128_t>) {
processFilter(filter, rows, rawNulls);
} else {
const auto actualType = CppToType<DataT>::create();
VELOX_NYI(
"Expected type HUGEINT, but found file type {}.",
actualType->toString());
}
break;
}
default:
VELOX_NYI("Unsupported filter: {}.", static_cast<int>(filterKind));
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::read(
int64_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) {
VELOX_CHECK(!scanSpec_->filter());
VELOX_CHECK(!scanSpec_->valueHook());
prepareRead<int64_t>(offset, rows, incomingNulls);
if (!resultNulls_ || !resultNulls_->unique() ||
resultNulls_->capacity() * 8 < rows.size()) {
// Make sure a dedicated resultNulls_ is allocated with enough capacity as
// RleDecoder always assumes it is available.
resultNulls_ = AlignedBuffer::allocate<bool>(rows.size(), memoryPool_);
rawResultNulls_ = resultNulls_->asMutable<uint64_t>();
}
bool isDense = rows.back() == rows.size() - 1;
if (isDense) {
readHelper<true>(rows);
readHelper<true>(scanSpec_->filter(), rows);
} else {
readHelper<false>(rows);
readHelper<false>(scanSpec_->filter(), rows);
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::getValues(
const RowSet& rows,
VectorPtr* result) {
rawValues_ = values_->asMutable<char>();
getIntValues(rows, requestedType_, result);
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::fillDecimals() {
auto nullsPtr =
resultNulls() ? resultNulls()->template as<uint64_t>() : nullptr;
auto scales = scaleBuffer_->as<int64_t>();
auto values = values_->asMutable<DataT>();

DecimalUtil::fillDecimals<DataT>(
values, nullsPtr, values, scales, numValues_, scale_);

rawValues_ = values_->asMutable<char>();
getIntValues(rows, requestedType_, result);
}

template class SelectiveDecimalColumnReader<int64_t>;
Expand Down
Loading
Loading