Skip to content
2 changes: 2 additions & 0 deletions velox/connectors/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ endif()
if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()

add_subdirectory(lakehouse)
5 changes: 4 additions & 1 deletion velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
fsStats_,
fileHandleFactory_,
executor_,
scanSpec_);
scanSpec_,
expressionEvaluator_,
totalRemainingFilterTime_);
}

std::vector<column_index_t> HiveDataSource::setupBucketConversion() {
Expand Down Expand Up @@ -318,6 +320,7 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
}

splitReader_ = createSplitReader();

if (!bucketChannels.empty()) {
splitReader_->setBucketConversion(std::move(bucketChannels));
}
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class HiveDataSource : public DataSource {
subfields_;
common::SubfieldFilters filters_;
std::shared_ptr<common::MetadataFilter> metadataFilter_;
std::unique_ptr<exec::ExprSet> remainingFilterExprSet_;
std::shared_ptr<exec::ExprSet> remainingFilterExprSet_;
RowVectorPtr emptyOutput_;
dwio::common::RuntimeStatistics runtimeStats_;
std::atomic<uint64_t> totalRemainingFilterTime_{0};
Expand Down
8 changes: 6 additions & 2 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ std::unique_ptr<SplitReader> SplitReader::create(
const std::shared_ptr<filesystems::File::IoStats>& fsStats,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const std::shared_ptr<common::ScanSpec>& scanSpec) {
const std::shared_ptr<common::ScanSpec>& scanSpec,
core::ExpressionEvaluator* expressionEvaluator,
std::atomic<uint64_t>& totalRemainingFilterTime) {
// Create the SplitReader based on hiveSplit->customSplitInfo["table_format"]
if (hiveSplit->customSplitInfo.count("table_format") > 0 &&
hiveSplit->customSplitInfo["table_format"] == "hive-iceberg") {
Expand All @@ -99,7 +101,9 @@ std::unique_ptr<SplitReader> SplitReader::create(
fsStats,
fileHandleFactory,
executor,
scanSpec);
scanSpec,
expressionEvaluator,
totalRemainingFilterTime);
} else {
return std::unique_ptr<SplitReader>(new SplitReader(
hiveSplit,
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ class SplitReader {
const std::shared_ptr<filesystems::File::IoStats>& fsStats,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const std::shared_ptr<common::ScanSpec>& scanSpec);
const std::shared_ptr<common::ScanSpec>& scanSpec,
core::ExpressionEvaluator* expressionEvaluator,
std::atomic<uint64_t>& totalRemainingFilterTime);

virtual ~SplitReader() = default;

Expand Down
9 changes: 6 additions & 3 deletions velox/connectors/hive/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@

velox_add_library(
velox_hive_iceberg_splitreader
EqualityDeleteFileReader.cpp
FilterUtil.cpp
IcebergDeleteFile.cpp
IcebergSplitReader.cpp
IcebergSplit.cpp
PositionalDeleteFileReader.cpp
)
PositionalDeleteFileReader.cpp)

velox_link_libraries(velox_hive_iceberg_splitreader velox_connector Folly::folly)
velox_link_libraries(velox_hive_iceberg_splitreader velox_connector
velox_dwio_common Folly::folly)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
224 changes: 224 additions & 0 deletions velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h"

#include "velox/connectors/hive/HiveConnectorUtil.h"
#include "velox/connectors/hive/iceberg/FilterUtil.h"
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
#include "velox/core/Expressions.h"
#include "velox/dwio/common/ReaderFactory.h"

using namespace facebook::velox::common;
using namespace facebook::velox::core;
using namespace facebook::velox::exec;

namespace facebook::velox::connector::hive::iceberg {

static constexpr const int kMaxBatchRows = 10'000;

EqualityDeleteFileReader::EqualityDeleteFileReader(
const IcebergDeleteFile& deleteFile,
std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const std::shared_ptr<io::IoStatistics> ioStats,
const std::shared_ptr<filesystems::File::IoStats>& fsStats,
const std::string& connectorId)
: deleteFile_(deleteFile),
baseFileSchema_(baseFileSchema),
fileHandleFactory_(fileHandleFactory),
pool_(connectorQueryCtx->memoryPool()),
deleteSplit_(nullptr),
deleteRowReader_(nullptr) {
VELOX_CHECK_EQ(deleteFile_.content, FileContent::kEqualityDeletes);

if (deleteFile_.recordCount == 0) {
return;
}

// TODO: push down filter if previous delete file contains this one. E.g.
// previous equality delete file has a=1, and this file also contains
// columns a, then a!=1 can be pushed as a filter when reading this delete
// file.

deleteSplit_ = std::make_shared<HiveConnectorSplit>(
connectorId,
deleteFile_.filePath,
deleteFile_.fileFormat,
0,
deleteFile_.fileSizeInBytes);

// Create the Reader and RowReader for the equality delete file

dwio::common::ReaderOptions deleteReaderOpts(pool_);
configureReaderOptions(
hiveConfig,
connectorQueryCtx,
nullptr,
deleteSplit_,
{},
deleteReaderOpts);

const FileHandleKey fileHandleKey{
.filename = deleteFile_.filePath,
.tokenProvider = connectorQueryCtx->fsTokenProvider()};
auto deleteFileHandleCachePtr = fileHandleFactory_->generate(fileHandleKey);
auto deleteFileInput = createBufferedInput(
*deleteFileHandleCachePtr,
deleteReaderOpts,
connectorQueryCtx,
ioStats,
fsStats,
executor);

auto deleteReader =
dwio::common::getReaderFactory(deleteReaderOpts.fileFormat())
->createReader(std::move(deleteFileInput), deleteReaderOpts);

// For now, we assume only the delete columns are written in the delete file
deleteFileRowType_ = deleteReader->rowType();
auto scanSpec = std::make_shared<common::ScanSpec>("<root>");
scanSpec->addAllChildFields(deleteFileRowType_->asRow());

dwio::common::RowReaderOptions deleteRowReaderOpts;
configureRowReaderOptions(
{},
scanSpec,
nullptr,
deleteFileRowType_,
deleteSplit_,
hiveConfig,
connectorQueryCtx->sessionProperties(),
deleteRowReaderOpts);

deleteRowReader_.reset();
deleteRowReader_ = deleteReader->createRowReader(deleteRowReaderOpts);
}

void EqualityDeleteFileReader::readDeleteValues(
SubfieldFilters& subfieldFilters,
std::vector<core::TypedExprPtr>& expressionInputs) {
VELOX_CHECK(deleteRowReader_);
VELOX_CHECK(deleteSplit_);

if (!deleteValuesOutput_) {
deleteValuesOutput_ = BaseVector::create(deleteFileRowType_, 0, pool_);
}

// TODO: verfiy if the field is an Iceberg RowId. Velox currently doesn't
// support pushing down filters to non-RowId types, i.e. sub-fields of Array
// or Map
if (deleteFileRowType_->size() == 1) {
// Construct the IN list filter that can be pushed down to the base file
// readers, then update the baseFileScanSpec.
buildDomainFilter(subfieldFilters);
} else {
// Build the filter functions that will be evaluated after all base file
// read is done
buildFilterFunctions(expressionInputs);
}

deleteSplit_.reset();
}

void EqualityDeleteFileReader::buildDomainFilter(
SubfieldFilters& subfieldFilters) {
std::unique_ptr<Filter> filter = std::make_unique<AlwaysTrue>();
auto name = baseFileSchema_->childByFieldId(deleteFile_.equalityFieldIds[0])
->fullName();
while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) {
if (deleteValuesOutput_->size() == 0) {
continue;
}

deleteValuesOutput_->loadedVector();
auto vector =
std::dynamic_pointer_cast<RowVector>(deleteValuesOutput_)->childAt(0);

auto typeKind = vector->type()->kind();
VELOX_CHECK(
typeKind != TypeKind::DOUBLE && typeKind != TypeKind::REAL,
"Iceberg does not allow DOUBLE or REAL columns as the equality delete columns: {} : {}",
name,
typeKind);

auto notExistsFilter =
createNotInFilter(vector, 0, deleteValuesOutput_->size(), typeKind);
filter = filter->mergeWith(notExistsFilter.get());
}

if (filter->kind() != FilterKind::kAlwaysTrue) {
if (subfieldFilters.find(common::Subfield(name)) != subfieldFilters.end()) {
subfieldFilters[common::Subfield(name)] =
subfieldFilters[common::Subfield(name)]->mergeWith(filter.get());
} else {
subfieldFilters[common::Subfield(name)] = std::move(filter);
}
}
}

void EqualityDeleteFileReader::buildFilterFunctions(
std::vector<core::TypedExprPtr>& expressionInputs) {
auto numDeleteFields = deleteFileRowType_->size();
VELOX_CHECK_GT(
numDeleteFields,
0,
"Iceberg equality delete file should have at least one field.");

// TODO: logical expression simplifications
while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) {
if (deleteValuesOutput_->size() == 0) {
continue;
}

deleteValuesOutput_->loadedVector();
auto rowVector = std::dynamic_pointer_cast<RowVector>(deleteValuesOutput_);
auto numDeletedValues = rowVector->childAt(0)->size();

for (int i = 0; i < numDeletedValues; i++) {
std::vector<core::TypedExprPtr> disconjunctInputs;

for (int j = 0; j < numDeleteFields; j++) {
auto type = deleteFileRowType_->childAt(j);
auto name =
baseFileSchema_->childByFieldId(deleteFile_.equalityFieldIds[j])
->fullName();
auto value = BaseVector::wrapInConstant(1, i, rowVector->childAt(j));

std::vector<core::TypedExprPtr> isNotEqualInputs;
isNotEqualInputs.push_back(
std::make_shared<FieldAccessTypedExpr>(type, name));
isNotEqualInputs.push_back(std::make_shared<ConstantTypedExpr>(value));
// TODO: generalize this to support different engines. Currently, only
// Presto "neq" is supported. Spark does not register the "neq" function
// but does support "not" and "equalto" functions.
auto isNotEqualExpr =
std::make_shared<CallTypedExpr>(BOOLEAN(), isNotEqualInputs, "neq");

disconjunctInputs.push_back(isNotEqualExpr);
}

auto disconjunctNotEqualExpr =
std::make_shared<CallTypedExpr>(BOOLEAN(), disconjunctInputs, "or");
expressionInputs.push_back(disconjunctNotEqualExpr);
}
}
}

} // namespace facebook::velox::connector::hive::iceberg
87 changes: 87 additions & 0 deletions velox/connectors/hive/iceberg/EqualityDeleteFileReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/connectors/Connector.h"
#include "velox/connectors/hive/FileHandle.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/dwio/common/Reader.h"
#include "velox/expression/Expr.h"

namespace facebook::velox::connector::hive::iceberg {

class IcebergDeleteFile;

using SubfieldFilters =
std::unordered_map<common::Subfield, std::unique_ptr<common::Filter>>;

class EqualityDeleteFileReader {
public:
EqualityDeleteFileReader(
const IcebergDeleteFile& deleteFile,
std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const std::shared_ptr<io::IoStatistics> ioStats,
const std::shared_ptr<filesystems::File::IoStats>& fsStats,
const std::string& connectorId);

/// Reads the delete values from the equality delete file, and interprets them
/// as filters for the base file reader.
///
/// @subfieldFilters The built SubfieldFilter that can be pushed down to the
/// base file RowReader, when the equality delete file only contains one
/// single subfield of Iceberg RowId type.
/// @typedExpressions The built TypedExpr that will be evaluated by the
/// connector DataSource after rows are read from the base file RowReader.
void readDeleteValues(
SubfieldFilters& subfieldFilters,
std::vector<core::TypedExprPtr>& typedExpressions);

private:
void buildDomainFilter(SubfieldFilters& subfieldFilters);

void buildFilterFunctions(std::vector<core::TypedExprPtr>& expressionInputs);

// The equality delete file to read
const IcebergDeleteFile& deleteFile_;
// The schema of the base file in terms of TypeWithId tree. In addition to the
// existing fields that were included in the base file ScanSpec, it also
// contains the extra fields that are in the equality delete file but not
// in the ScanSpec of the base file
const std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema_;

// The cache factory of the file handles, which can be used to return the file
// handle of the delete file.
FileHandleFactory* const fileHandleFactory_;
memory::MemoryPool* const pool_;

// The split of the equality delete file to be processed by the delete file
// RowReader.
std::shared_ptr<const HiveConnectorSplit> deleteSplit_;
// The RowType of the equality delete file
RowTypePtr deleteFileRowType_;
// The RowReader to read the equality delete file
std::unique_ptr<dwio::common::RowReader> deleteRowReader_;
// The output vector to hold the delete values
VectorPtr deleteValuesOutput_;
};

} // namespace facebook::velox::connector::hive::iceberg
Loading