Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for dictionary encoded vector ingestion #85

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 90 additions & 1 deletion dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include "dwio/nimble/velox/SchemaTypes.h"
#include "velox/common/base/CompareFlags.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DictionaryVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/TypeAliases.h"

namespace facebook::nimble {

Expand Down Expand Up @@ -1037,7 +1039,19 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
OrderedRanges childFilteredRanges;
auto array = ingestLengthsOffsets(vector, ranges, childFilteredRanges);
const velox::ArrayVector* array;
// To unwrap the dictionaryVector we need to cast into ComplexType before
// extracting value arrayVector
const auto dictionaryVector =
vector->as<velox::DictionaryVector<velox::ComplexType>>();
if (dictionaryVector &&
dictionaryVector->valueVector()->template as<velox::ArrayVector>() &&
isDictionaryValidRunLengthEncoded(*dictionaryVector)) {
array = ingestLengthsOffsetsAlreadyEncoded(
*dictionaryVector, ranges, childFilteredRanges);
} else {
array = ingestLengthsOffsets(vector, ranges, childFilteredRanges);
}
if (childFilteredRanges.size() > 0) {
elements_->write(array->elements(), childFilteredRanges);
}
Expand Down Expand Up @@ -1068,6 +1082,81 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
velox::VectorPtr cachedValue_;
velox::vector_size_t cachedSize_;

/*
* Check if the dictionary is valid run length encoded.
* A dictionary is valid if its offsets in order are
* increasing or equal. Two or more offsets are equal
* when the dictionary has been deduped (the values
* vec will be smaller as a result)
* The read side expects offsets to be ordered for caching,
* so we need to ensure that they are ordered if we are going to
* passthrough the dictionary without applying any offset dedup logic.
* Dictionaries of 0 or size 1 are always considered dictionary length
* encoded since there are 0 or 1 offsets to validate.
*/
bool isDictionaryValidRunLengthEncoded(
const velox::DictionaryVector<velox::ComplexType>& dictionaryVector) {
const velox::vector_size_t* indices =
dictionaryVector.indices()->template as<velox::vector_size_t>();
for (int i = 1; i < dictionaryVector.size(); ++i) {
if (indices[i] < indices[i - 1]) {
return false;
}
}

return true;
}

velox::ArrayVector* ingestLengthsOffsetsAlreadyEncoded(
const velox::DictionaryVector<velox::ComplexType>& dictionaryVector,
const OrderedRanges& ranges,
OrderedRanges& filteredRanges) {
auto size = ranges.size();
offsetsStream_.ensureNullsCapacity(dictionaryVector.mayHaveNulls(), size);

auto& offsetsData = offsetsStream_.mutableData();
auto& lengthsData = lengthsStream_.mutableData();
auto& nonNulls = offsetsStream_.mutableNonNulls();

const velox::vector_size_t* offsets =
dictionaryVector.indices()->template as<velox::vector_size_t>();
auto valuesArrayVector =
dictionaryVector.valueVector()->template as<velox::ArrayVector>();

auto previousOffset = -1;
auto ingestDictionaryIndex = [&](auto index) {
// skip writing length if previous offset was the same
if (previousOffset < 0 || offsetsData.empty() ||
offsets[index] != previousOffset) {
auto arrayOffset = valuesArrayVector->offsetAt(offsets[index]);
auto length = valuesArrayVector->sizeAt(offsets[index]);
lengthsData.push_back(length);
if (length > 0) {
filteredRanges.add(arrayOffset, length);
}
++nextOffset_;
}

offsetsData.push_back(nextOffset_ - 1);
previousOffset = offsets[index];
};

if (dictionaryVector.mayHaveNulls()) {
ranges.applyEach([&](auto index) {
auto notNull = !dictionaryVector.isNullAt(index);
nonNulls.push_back(notNull);
if (notNull) {
ingestDictionaryIndex(index);
}
});
} else {
ranges.applyEach([&](auto index) { ingestDictionaryIndex(index); });
}
// ensure that we mark cache as invalidated
cached_ = false;
return valuesArrayVector;
}

template <typename Vector>
void ingestLengthsOffsetsByElements(
const velox::ArrayVector* array,
Expand Down
Loading