Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions velox/connectors/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ endif()
if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()

add_subdirectory(lakehouse)
5 changes: 4 additions & 1 deletion velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
fsStats_,
fileHandleFactory_,
executor_,
scanSpec_);
scanSpec_,
expressionEvaluator_,
totalRemainingFilterTime_);
}

std::vector<column_index_t> HiveDataSource::setupBucketConversion() {
Expand Down Expand Up @@ -318,6 +320,7 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
}

splitReader_ = createSplitReader();

if (!bucketChannels.empty()) {
splitReader_->setBucketConversion(std::move(bucketChannels));
}
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class HiveDataSource : public DataSource {
subfields_;
common::SubfieldFilters filters_;
std::shared_ptr<common::MetadataFilter> metadataFilter_;
std::unique_ptr<exec::ExprSet> remainingFilterExprSet_;
std::shared_ptr<exec::ExprSet> remainingFilterExprSet_;
RowVectorPtr emptyOutput_;
dwio::common::RuntimeStatistics runtimeStats_;
std::atomic<uint64_t> totalRemainingFilterTime_{0};
Expand Down
8 changes: 6 additions & 2 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ std::unique_ptr<SplitReader> SplitReader::create(
const std::shared_ptr<filesystems::File::IoStats>& fsStats,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const std::shared_ptr<common::ScanSpec>& scanSpec) {
const std::shared_ptr<common::ScanSpec>& scanSpec,
core::ExpressionEvaluator* expressionEvaluator,
std::atomic<uint64_t>& totalRemainingFilterTime) {
// Create the SplitReader based on hiveSplit->customSplitInfo["table_format"]
if (hiveSplit->customSplitInfo.count("table_format") > 0 &&
hiveSplit->customSplitInfo["table_format"] == "hive-iceberg") {
Expand All @@ -99,7 +101,9 @@ std::unique_ptr<SplitReader> SplitReader::create(
fsStats,
fileHandleFactory,
executor,
scanSpec);
scanSpec,
expressionEvaluator,
totalRemainingFilterTime);
} else {
return std::unique_ptr<SplitReader>(new SplitReader(
hiveSplit,
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ class SplitReader {
const std::shared_ptr<filesystems::File::IoStats>& fsStats,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const std::shared_ptr<common::ScanSpec>& scanSpec);
const std::shared_ptr<common::ScanSpec>& scanSpec,
core::ExpressionEvaluator* expressionEvaluator,
std::atomic<uint64_t>& totalRemainingFilterTime);

virtual ~SplitReader() = default;

Expand Down
224 changes: 224 additions & 0 deletions velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h"

#include "velox/connectors/hive/HiveConnectorUtil.h"
#include "velox/connectors/hive/iceberg/FilterUtil.h"
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
#include "velox/core/Expressions.h"
#include "velox/dwio/common/ReaderFactory.h"

using namespace facebook::velox::common;
using namespace facebook::velox::core;
using namespace facebook::velox::exec;

namespace facebook::velox::connector::hive::iceberg {

static constexpr const int kMaxBatchRows = 10'000;

EqualityDeleteFileReader::EqualityDeleteFileReader(
const IcebergDeleteFile& deleteFile,
std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const std::shared_ptr<io::IoStatistics> ioStats,
const std::shared_ptr<filesystems::File::IoStats>& fsStats,
const std::string& connectorId)
: deleteFile_(deleteFile),
baseFileSchema_(baseFileSchema),
fileHandleFactory_(fileHandleFactory),
pool_(connectorQueryCtx->memoryPool()),
deleteSplit_(nullptr),
deleteRowReader_(nullptr) {
VELOX_CHECK_EQ(deleteFile_.content, FileContent::kEqualityDeletes);

if (deleteFile_.recordCount == 0) {
return;
}

// TODO: push down filter if previous delete file contains this one. E.g.
// previous equality delete file has a=1, and this file also contains
// columns a, then a!=1 can be pushed as a filter when reading this delete
// file.

deleteSplit_ = std::make_shared<HiveConnectorSplit>(
connectorId,
deleteFile_.filePath,
deleteFile_.fileFormat,
0,
deleteFile_.fileSizeInBytes);

// Create the Reader and RowReader for the equality delete file

dwio::common::ReaderOptions deleteReaderOpts(pool_);
configureReaderOptions(
hiveConfig,
connectorQueryCtx,
nullptr,
deleteSplit_,
{},
deleteReaderOpts);

const FileHandleKey fileHandleKey{
.filename = deleteFile_.filePath,
.tokenProvider = connectorQueryCtx->fsTokenProvider()};
auto deleteFileHandleCachePtr = fileHandleFactory_->generate(fileHandleKey);
auto deleteFileInput = createBufferedInput(
*deleteFileHandleCachePtr,
deleteReaderOpts,
connectorQueryCtx,
ioStats,
fsStats,
executor);

auto deleteReader =
dwio::common::getReaderFactory(deleteReaderOpts.fileFormat())
->createReader(std::move(deleteFileInput), deleteReaderOpts);

// For now, we assume only the delete columns are written in the delete file
deleteFileRowType_ = deleteReader->rowType();
auto scanSpec = std::make_shared<common::ScanSpec>("<root>");
scanSpec->addAllChildFields(deleteFileRowType_->asRow());

dwio::common::RowReaderOptions deleteRowReaderOpts;
configureRowReaderOptions(
{},
scanSpec,
nullptr,
deleteFileRowType_,
deleteSplit_,
hiveConfig,
connectorQueryCtx->sessionProperties(),
deleteRowReaderOpts);

deleteRowReader_.reset();
deleteRowReader_ = deleteReader->createRowReader(deleteRowReaderOpts);
}

void EqualityDeleteFileReader::readDeleteValues(
SubfieldFilters& subfieldFilters,
std::vector<core::TypedExprPtr>& expressionInputs) {
VELOX_CHECK(deleteRowReader_);
VELOX_CHECK(deleteSplit_);

if (!deleteValuesOutput_) {
deleteValuesOutput_ = BaseVector::create(deleteFileRowType_, 0, pool_);
}

// TODO: verfiy if the field is an Iceberg RowId. Velox currently doesn't
// support pushing down filters to non-RowId types, i.e. sub-fields of Array
// or Map
if (deleteFileRowType_->size() == 1) {
// Construct the IN list filter that can be pushed down to the base file
// readers, then update the baseFileScanSpec.
buildDomainFilter(subfieldFilters);
} else {
// Build the filter functions that will be evaluated after all base file
// read is done
buildFilterFunctions(expressionInputs);
}

deleteSplit_.reset();
}

void EqualityDeleteFileReader::buildDomainFilter(
SubfieldFilters& subfieldFilters) {
std::unique_ptr<Filter> filter = std::make_unique<AlwaysTrue>();
auto name = baseFileSchema_->childByFieldId(deleteFile_.equalityFieldIds[0])
->fullName();
while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) {
if (deleteValuesOutput_->size() == 0) {
continue;
}

deleteValuesOutput_->loadedVector();
auto vector =
std::dynamic_pointer_cast<RowVector>(deleteValuesOutput_)->childAt(0);

auto typeKind = vector->type()->kind();
VELOX_CHECK(
typeKind != TypeKind::DOUBLE && typeKind != TypeKind::REAL,
"Iceberg does not allow DOUBLE or REAL columns as the equality delete columns: {} : {}",
name,
typeKind);

auto notExistsFilter =
createNotInFilter(vector, 0, deleteValuesOutput_->size(), typeKind);
filter = filter->mergeWith(notExistsFilter.get());
}

if (filter->kind() != FilterKind::kAlwaysTrue) {
if (subfieldFilters.find(common::Subfield(name)) != subfieldFilters.end()) {
subfieldFilters[common::Subfield(name)] =
subfieldFilters[common::Subfield(name)]->mergeWith(filter.get());
} else {
subfieldFilters[common::Subfield(name)] = std::move(filter);
}
}
}

void EqualityDeleteFileReader::buildFilterFunctions(
std::vector<core::TypedExprPtr>& expressionInputs) {
auto numDeleteFields = deleteFileRowType_->size();
VELOX_CHECK_GT(
numDeleteFields,
0,
"Iceberg equality delete file should have at least one field.");

// TODO: logical expression simplifications
while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) {
if (deleteValuesOutput_->size() == 0) {
continue;
}

deleteValuesOutput_->loadedVector();
auto rowVector = std::dynamic_pointer_cast<RowVector>(deleteValuesOutput_);
auto numDeletedValues = rowVector->childAt(0)->size();

for (int i = 0; i < numDeletedValues; i++) {
std::vector<core::TypedExprPtr> disconjunctInputs;

for (int j = 0; j < numDeleteFields; j++) {
auto type = deleteFileRowType_->childAt(j);
auto name =
baseFileSchema_->childByFieldId(deleteFile_.equalityFieldIds[j])
->fullName();
auto value = BaseVector::wrapInConstant(1, i, rowVector->childAt(j));

std::vector<core::TypedExprPtr> isNotEqualInputs;
isNotEqualInputs.push_back(
std::make_shared<FieldAccessTypedExpr>(type, name));
isNotEqualInputs.push_back(std::make_shared<ConstantTypedExpr>(value));
// TODO: generalize this to support different engines. Currently, only
// Presto "neq" is supported. Spark does not register the "neq" function
// but does support "not" and "equalto" functions.
auto isNotEqualExpr =
std::make_shared<CallTypedExpr>(BOOLEAN(), isNotEqualInputs, "neq");

disconjunctInputs.push_back(isNotEqualExpr);
}

auto disconjunctNotEqualExpr =
std::make_shared<CallTypedExpr>(BOOLEAN(), disconjunctInputs, "or");
expressionInputs.push_back(disconjunctNotEqualExpr);
}
}
}

} // namespace facebook::velox::connector::hive::iceberg
87 changes: 87 additions & 0 deletions velox/connectors/hive/iceberg/EqualityDeleteFileReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/connectors/Connector.h"
#include "velox/connectors/hive/FileHandle.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/dwio/common/Reader.h"
#include "velox/expression/Expr.h"

namespace facebook::velox::connector::hive::iceberg {

class IcebergDeleteFile;

using SubfieldFilters =
std::unordered_map<common::Subfield, std::unique_ptr<common::Filter>>;

class EqualityDeleteFileReader {
public:
EqualityDeleteFileReader(
const IcebergDeleteFile& deleteFile,
std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const std::shared_ptr<io::IoStatistics> ioStats,
const std::shared_ptr<filesystems::File::IoStats>& fsStats,
const std::string& connectorId);

/// Reads the delete values from the equality delete file, and interprets them
/// as filters for the base file reader.
///
/// @subfieldFilters The built SubfieldFilter that can be pushed down to the
/// base file RowReader, when the equality delete file only contains one
/// single subfield of Iceberg RowId type.
/// @typedExpressions The built TypedExpr that will be evaluated by the
/// connector DataSource after rows are read from the base file RowReader.
void readDeleteValues(
SubfieldFilters& subfieldFilters,
std::vector<core::TypedExprPtr>& typedExpressions);

private:
void buildDomainFilter(SubfieldFilters& subfieldFilters);

void buildFilterFunctions(std::vector<core::TypedExprPtr>& expressionInputs);

// The equality delete file to read
const IcebergDeleteFile& deleteFile_;
// The schema of the base file in terms of TypeWithId tree. In addition to the
// existing fields that were included in the base file ScanSpec, it also
// contains the extra fields that are in the equality delete file but not
// in the ScanSpec of the base file
const std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema_;

// The cache factory of the file handles, which can be used to return the file
// handle of the delete file.
FileHandleFactory* const fileHandleFactory_;
memory::MemoryPool* const pool_;

// The split of the equality delete file to be processed by the delete file
// RowReader.
std::shared_ptr<const HiveConnectorSplit> deleteSplit_;
// The RowType of the equality delete file
RowTypePtr deleteFileRowType_;
// The RowReader to read the equality delete file
std::unique_ptr<dwio::common::RowReader> deleteRowReader_;
// The output vector to hold the delete values
VectorPtr deleteValuesOutput_;
};

} // namespace facebook::velox::connector::hive::iceberg
Loading