Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test to do map->flatmap and then passthrough that same flatmap #89

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 237 additions & 3 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include "dwio/nimble/velox/SchemaTypes.h"
#include "velox/common/base/CompareFlags.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DictionaryVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/TypeAliases.h"

namespace facebook::nimble {

Expand Down Expand Up @@ -745,6 +747,35 @@ class SlidingWindowMapFieldWriter : public FieldWriter {
}
};

class FlatMapPassthroughValueFieldWriter {
public:
FlatMapPassthroughValueFieldWriter(
FieldWriterContext& context,
const StreamDescriptorBuilder& inMapDescriptor,
std::unique_ptr<FieldWriter> valueField)
: valueField_{std::move(valueField)},
inMapStream_{context.createContentStreamData<bool>(inMapDescriptor)} {}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) {
auto& data = inMapStream_.mutableData();
data.resize(data.size() + ranges.size(), true);
valueField_->write(vector, ranges);
}

void reset() {
inMapStream_.reset();
valueField_->reset();
}

void close() {
valueField_->close();
}

private:
std::unique_ptr<FieldWriter> valueField_;
ContentStreamData<bool>& inMapStream_;
};

class FlatMapValueFieldWriter {
public:
FlatMapValueFieldWriter(
Expand Down Expand Up @@ -820,6 +851,81 @@ class FlatMapFieldWriter : public FieldWriter {

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
// Check if the vector received is already flattened
const auto isFlatMap = vector->type()->kind() == velox::TypeKind::ROW;
isFlatMap ? ingestFlattenedMap(vector, ranges) : ingestMap(vector, ranges);
}

FlatMapPassthroughValueFieldWriter& createPassthroughValueFieldWriter(
const std::string& key) {
auto fieldWriter = FieldWriter::create(context_, valueType_);
auto& inMapDescriptor =
typeBuilder_->asFlatMap().addChild(key, fieldWriter->typeBuilder());
if (context_.flatmapFieldAddedEventHandler) {
context_.flatmapFieldAddedEventHandler(
*typeBuilder_, key, *fieldWriter->typeBuilder());
}
auto it = currentPassthroughFields_
.insert(
{key,
std::make_unique<FlatMapPassthroughValueFieldWriter>(
context_, inMapDescriptor, std::move(fieldWriter))})
.first;
return *it->second;
}

FlatMapPassthroughValueFieldWriter& findPassthroughValueFieldWriter(
const std::string& key) {
auto existingPair = currentPassthroughFields_.find(key);
NIMBLE_ASSERT(
existingPair != currentPassthroughFields_.end(),
"Field writer must already exist in map");
return *existingPair->second;
}

void ingestFlattenedMap(
const velox::VectorPtr& vector,
const OrderedRanges& ranges) {
NIMBLE_ASSERT(
currentValueFields_.empty() && allValueFields_.empty(),
"Mixing map and flatmap vectors in the FlatMapFieldWriter is not supported");
const auto& flatMap = vector->as<velox::RowVector>();
NIMBLE_ASSERT(
flatMap,
"Unexpected vector type. Vector must be a decoded ROW vector.");
const auto size = ranges.size();
nullsStream_.ensureNullsCapacity(flatMap->mayHaveNulls(), size);
const auto& keys = flatMap->type()->asRow().names();
const auto& values = flatMap->children();

OrderedRanges childRanges;
iterateNonNullIndices<true>(
ranges, nullsStream_.mutableNonNulls(), Flat{vector}, [&](auto offset) {
childRanges.add(offset, 1);
});

// early bail out if no ranges at the top level row vector
if (childRanges.size() == 0) {
return;
}

// Only create keys on first call to write (with valid ranges).
// Subsequent calls must have the same set of keys,
// otherwise writer will throw.
bool populateMap = currentPassthroughFields_.empty();

for (int i = 0; i < keys.size(); ++i) {
const auto& key = keys[i];
auto& writer = populateMap ? createPassthroughValueFieldWriter(key)
: findPassthroughValueFieldWriter(key);
writer.write(values[i], childRanges);
}
}

void ingestMap(const velox::VectorPtr& vector, const OrderedRanges& ranges) {
NIMBLE_ASSERT(
currentPassthroughFields_.empty(),
"Mixing map and flatmap vectors in the FlatMapFieldWriter is not supported");
auto size = ranges.size();
const velox::vector_size_t* offsets;
const velox::vector_size_t* lengths;
Expand Down Expand Up @@ -910,20 +1016,28 @@ class FlatMapFieldWriter : public FieldWriter {
field.second->reset();
}

for (auto& field : currentPassthroughFields_) {
field.second->reset();
}

nullsStream_.reset();
nonNullCount_ = 0;
currentValueFields_.clear();
}

void close() override {
// Add dummy node so we can preserve schema of an empty flat map.
if (allValueFields_.empty()) {
// Add dummy node so we can preserve schema of an empty flat map
// when no fields are written
if (allValueFields_.empty() && currentPassthroughFields_.empty()) {
auto valueField = FieldWriter::create(context_, valueType_);
typeBuilder_->asFlatMap().addChild("", valueField->typeBuilder());
} else {
for (auto& pair : allValueFields_) {
pair.second->close();
}
for (auto& pair : currentPassthroughFields_) {
pair.second->close();
}
}
}

Expand Down Expand Up @@ -967,6 +1081,12 @@ class FlatMapFieldWriter : public FieldWriter {
NullsStreamData& nullsStream_;
// This map store the FlatMapValue fields used in current flush unit.
folly::F14FastMap<KeyType, FlatMapValueFieldWriter*> currentValueFields_;

// This map stores the FlatMapPassthrough fields.
folly::F14FastMap<
std::string,
std::unique_ptr<FlatMapPassthroughValueFieldWriter>>
currentPassthroughFields_;
const std::shared_ptr<const velox::dwio::common::TypeWithId>& valueType_;
uint64_t nonNullCount_ = 0;
// This map store all FlatMapValue fields encountered by the VeloxWriter
Expand Down Expand Up @@ -1037,7 +1157,19 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
OrderedRanges childFilteredRanges;
auto array = ingestLengthsOffsets(vector, ranges, childFilteredRanges);
const velox::ArrayVector* array;
// To unwrap the dictionaryVector we need to cast into ComplexType before
// extracting value arrayVector
const auto dictionaryVector =
vector->as<velox::DictionaryVector<velox::ComplexType>>();
if (dictionaryVector &&
dictionaryVector->valueVector()->template as<velox::ArrayVector>() &&
isDictionaryValidRunLengthEncoded(*dictionaryVector)) {
array = ingestLengthsOffsetsAlreadyEncoded(
*dictionaryVector, ranges, childFilteredRanges);
} else {
array = ingestLengthsOffsets(vector, ranges, childFilteredRanges);
}
if (childFilteredRanges.size() > 0) {
elements_->write(array->elements(), childFilteredRanges);
}
Expand Down Expand Up @@ -1068,6 +1200,106 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
velox::VectorPtr cachedValue_;
velox::vector_size_t cachedSize_;

/*
* Check if the dictionary is valid run length encoded.
* A dictionary is valid if its offsets in order are
* increasing or equal. Two or more offsets are equal
* when the dictionary has been deduped (the values
* vec will be smaller as a result)
* The read side expects offsets to be ordered for caching,
* so we need to ensure that they are ordered if we are going to
* passthrough the dictionary without applying any offset dedup logic.
* Dictionaries of 0 or size 1 are always considered dictionary length
* encoded since there are 0 or 1 offsets to validate.
*/
bool isDictionaryValidRunLengthEncoded(
const velox::DictionaryVector<velox::ComplexType>& dictionaryVector) {
const velox::vector_size_t* indices =
dictionaryVector.indices()->template as<velox::vector_size_t>();
for (int i = 1; i < dictionaryVector.size(); ++i) {
if (indices[i] < indices[i - 1]) {
return false;
}
}

return true;
}

velox::ArrayVector* ingestLengthsOffsetsAlreadyEncoded(
const velox::DictionaryVector<velox::ComplexType>& dictionaryVector,
const OrderedRanges& ranges,
OrderedRanges& filteredRanges) {
auto size = ranges.size();
offsetsStream_.ensureNullsCapacity(dictionaryVector.mayHaveNulls(), size);

auto& offsetsData = offsetsStream_.mutableData();
auto& lengthsData = lengthsStream_.mutableData();
auto& nonNulls = offsetsStream_.mutableNonNulls();

const velox::vector_size_t* offsets =
dictionaryVector.indices()->template as<velox::vector_size_t>();
auto valuesArrayVector =
dictionaryVector.valueVector()->template as<velox::ArrayVector>();

auto previousOffset = -1;
bool newElementIngested = false;
auto ingestDictionaryIndex = [&](auto index) {
bool match = false;
// Only write length if first element or if consecutive offset is
// different, meaning we have reached a new value element.
if (previousOffset >= 0) {
match = (offsets[index] == previousOffset);
} else if (cached_) {
velox::CompareFlags flags;
match =
(valuesArrayVector->sizeAt(offsets[index]) == cachedSize_ &&
valuesArrayVector
->compare(cachedValue_.get(), offsets[index], 0, flags)
.value_or(-1) == 0);
}

if (!match) {
auto arrayOffset = valuesArrayVector->offsetAt(offsets[index]);
auto length = valuesArrayVector->sizeAt(offsets[index]);
lengthsData.push_back(length);
newElementIngested = true;
if (length > 0) {
filteredRanges.add(arrayOffset, length);
}
++nextOffset_;
}

offsetsData.push_back(nextOffset_ - 1);
previousOffset = offsets[index];
};

if (dictionaryVector.mayHaveNulls()) {
ranges.applyEach([&](auto index) {
auto notNull = !dictionaryVector.isNullAt(index);
nonNulls.push_back(notNull);
if (notNull) {
ingestDictionaryIndex(index);
}
});
} else {
ranges.applyEach([&](auto index) { ingestDictionaryIndex(index); });
}

// insert last element discovered into cache
if (newElementIngested) {
cached_ = true;
cachedSize_ = lengthsData[lengthsData.size() - 1];
cachedValue_->prepareForReuse();
velox::BaseVector::CopyRange cacheRange{
static_cast<velox::vector_size_t>(previousOffset) /* source index*/,
0 /* target index*/,
1 /* count*/};
cachedValue_->copyRanges(valuesArrayVector, folly::Range(&cacheRange, 1));
}

return valuesArrayVector;
}

template <typename Vector>
void ingestLengthsOffsetsByElements(
const velox::ArrayVector* array,
Expand Down Expand Up @@ -1280,6 +1512,8 @@ std::unique_ptr<FieldWriter> createArrayWithOffsetsFieldWriter(

FieldWriterContext::LocalDecodedVector
FieldWriterContext::getLocalDecodedVector() {
NIMBLE_DASSERT(vectorDecoderVisitor, "vectorDecoderVisitor is missing");
vectorDecoderVisitor();
return LocalDecodedVector{*this};
}

Expand Down
8 changes: 6 additions & 2 deletions dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ struct FieldWriterContext {

explicit FieldWriterContext(
velox::memory::MemoryPool& memoryPool,
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr)
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr,
std::function<void(void)> vectorDecoderVisitor = []() {})
: bufferMemoryPool{memoryPool.addLeafChild(
"field_writer_buffer",
true,
std::move(reclaimer))},
inputBufferGrowthPolicy{
DefaultInputBufferGrowthPolicy::withDefaultRanges()} {
DefaultInputBufferGrowthPolicy::withDefaultRanges()},
vectorDecoderVisitor(std::move(vectorDecoderVisitor)) {
resetStringBuffer();
}

Expand All @@ -65,6 +67,8 @@ struct FieldWriterContext {
std::function<void(const TypeBuilder&)> typeAddedHandler =
[](const TypeBuilder&) {};

std::function<void(void)> vectorDecoderVisitor;

LocalDecodedVector getLocalDecodedVector();
velox::SelectivityVector& getSelectivityVector(velox::vector_size_t size);

Expand Down
2 changes: 1 addition & 1 deletion dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class WriterContext : public FieldWriterContext {
WriterContext(
velox::memory::MemoryPool& memoryPool,
VeloxWriterOptions options)
: FieldWriterContext{memoryPool, options.reclaimerFactory()},
: FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor},
options{std::move(options)},
logger{this->options.metricsLogger} {
flushPolicy = this->options.flushPolicyFactory();
Expand Down
5 changes: 5 additions & 0 deletions dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ struct VeloxWriterOptions {
std::shared_ptr<folly::Executor> encodingExecutor;

bool enableChunking = false;

// This callback will be visited on access to getDecodedVector in order to
// monitor usage of decoded vectors vs. data that is passed-through in the
// writer. Default function is no-op since its used for tests only.
std::function<void(void)> vectorDecoderVisitor = []() {};
};

} // namespace facebook::nimble
Loading