Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions .github/workflows/linux-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,6 @@
name: Linux Build using GCC

on:
push:
branches:
- main
paths:
- velox/**
- '!velox/docs/**'
- CMakeLists.txt
- CMake/**
- scripts/setup-ubuntu.sh
- scripts/setup-common.sh
- scripts/setup-versions.sh
- scripts/setup-helper-functions.sh
- .github/workflows/linux-build.yml
- .github/workflows/linux-build-base.yml

pull_request:
paths:
- velox/**
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ velox_add_library(
HiveDataSource.cpp
FileIndexReader.cpp
HiveIndexSource.cpp
HivePartitionName.cpp
HivePartitionUtil.cpp
PartitionIdGenerator.cpp
SplitReader.cpp
TableHandle.cpp
Expand All @@ -45,6 +45,7 @@ velox_link_libraries(
velox_dwio_catalog_fbhive
velox_hive_partition_function
velox_key_encoder
PUBLIC velox_hive_iceberg_splitreader
)

velox_add_library(velox_hive_partition_function HivePartitionFunction.cpp)
Expand Down
5 changes: 5 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,4 +316,9 @@ std::string HiveConfig::schema(const config::ConfigBase* session) const {
kSchema, config_->get<std::string>(kSchema, ""));
}

bool HiveConfig::fanoutEnabled(const config::ConfigBase* session) const {
return session->get<bool>(
kFanoutEnabledSession, config_->get<bool>(kFanoutEnabled, true));
}

} // namespace facebook::velox::connector::hive
9 changes: 9 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ class HiveConfig {
static constexpr const char* kSource = "source";
static constexpr const char* kSchema = "schema";

/// Controls the writer mode, whether the fanout mode writer is enabled,
/// default value is true, setting to false means clustered mode.
/// Currently applies only to the Iceberg writer.
static constexpr const char* kFanoutEnabled = "fanout-enabled";
static constexpr const char* kFanoutEnabledSession = "fanout_enabled";

InsertExistingPartitionsBehavior insertExistingPartitionsBehavior(
const config::ConfigBase* session) const;

Expand Down Expand Up @@ -364,6 +370,9 @@ class HiveConfig {
/// Schema of the query. Used for storage logging.
std::string schema(const config::ConfigBase* session) const;

/// Return if fanout writer mode is enabled.
bool fanoutEnabled(const config::ConfigBase* session) const;

HiveConfig(std::shared_ptr<const config::ConfigBase> config) {
VELOX_CHECK_NOT_NULL(
config, "Config is null for HiveConfig initialization");
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ HiveConnector::HiveConnector(
std::unique_ptr<DataSource> HiveConnector::createDataSource(
const RowTypePtr& outputType,
const ConnectorTableHandlePtr& tableHandle,
const ColumnHandleMap& columnHandles,
const std::unordered_map<std::string, ColumnHandlePtr>& columnHandles,
ConnectorQueryCtx* connectorQueryCtx) {
return std::make_unique<HiveDataSource>(
outputType,
Expand Down
9 changes: 8 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
#include "velox/expression/ExprToSubfieldFilter.h"
#include "velox/expression/FieldReference.h"

#include <boost/lexical_cast.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>

namespace facebook::velox::connector::hive {
namespace {

Expand Down Expand Up @@ -884,7 +888,6 @@ std::unique_ptr<dwio::common::BufferedInput> createBufferedInput(
}

namespace {

core::CallTypedExprPtr replaceInputs(
const core::CallTypedExpr* call,
std::vector<core::TypedExprPtr>&& inputs) {
Expand Down Expand Up @@ -1057,6 +1060,10 @@ core::TypedExprPtr extractFiltersFromRemainingFilter(
}
} // namespace

std::string makeUuid() {
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
}

core::TypedExprPtr extractFiltersFromRemainingFilter(
const core::TypedExprPtr& expr,
core::ExpressionEvaluator* evaluator,
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,6 @@ std::unique_ptr<common::Filter> createRangeFilter(
const variant& lower,
const variant& upper);

std::string makeUuid();

} // namespace facebook::velox::connector::hive
143 changes: 59 additions & 84 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,34 @@ RowVectorPtr makeDataInput(
input->getNullCount());
}

// Creates a PartitionIdGenerator if the table is partitioned, otherwise returns
// nullptr.
std::unique_ptr<PartitionIdGenerator> createPartitionIdGenerator(
const RowTypePtr& inputType,
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const ConnectorQueryCtx* connectorQueryCtx) {
auto partitionChannels = insertTableHandle->partitionChannels();
if (partitionChannels.empty()) {
return nullptr;
// Returns a subset of column indices corresponding to partition keys.
std::vector<column_index_t> getPartitionChannels(
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
std::vector<column_index_t> channels;

for (column_index_t i = 0; i < insertTableHandle->inputColumns().size();
i++) {
if (insertTableHandle->inputColumns()[i]->isPartitionKey()) {
channels.push_back(i);
}
}
return std::make_unique<PartitionIdGenerator>(
inputType,
partitionChannels,
hiveConfig->maxPartitionsPerWriters(
connectorQueryCtx->sessionProperties()),
connectorQueryCtx->memoryPool());

return channels;
}

// Returns the column indices of non-partition data columns.
std::vector<column_index_t> getNonPartitionChannels(
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
std::vector<column_index_t> dataChannels;

for (column_index_t i = 0; i < insertTableHandle->inputColumns().size();
i++) {
if (!insertTableHandle->inputColumns()[i]->isPartitionKey()) {
dataChannels.push_back(i);
}
}

return dataChannels;
}

std::string makePartitionDirectory(
Expand All @@ -148,10 +159,6 @@ std::string makePartitionDirectory(
return tableDirectory;
}

std::string makeUuid() {
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
}

std::unordered_map<LocationHandle::TableType, std::string> tableTypeNames() {
return {
{LocationHandle::TableType::kNew, "kNew"},
Expand Down Expand Up @@ -418,52 +425,6 @@ std::string HiveBucketProperty::toString() const {
return out.str();
}

HiveInsertTableHandle::HiveInsertTableHandle(
std::vector<std::shared_ptr<const HiveColumnHandle>> inputColumns,
std::shared_ptr<const LocationHandle> locationHandle,
dwio::common::FileFormat storageFormat,
std::shared_ptr<const HiveBucketProperty> bucketProperty,
std::optional<common::CompressionKind> compressionKind,
const std::unordered_map<std::string, std::string>& serdeParameters,
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions,
// When this option is set the HiveDataSink will always write a file even
// if there's no data. This is useful when the table is bucketed, but the
// engine handles ensuring a 1 to 1 mapping from task to bucket.
const bool ensureFiles,
std::shared_ptr<const FileNameGenerator> fileNameGenerator)
: inputColumns_(std::move(inputColumns)),
locationHandle_(std::move(locationHandle)),
storageFormat_(storageFormat),
bucketProperty_(std::move(bucketProperty)),
compressionKind_(compressionKind),
serdeParameters_(serdeParameters),
writerOptions_(writerOptions),
ensureFiles_(ensureFiles),
fileNameGenerator_(std::move(fileNameGenerator)),
partitionChannels_(computePartitionChannels(inputColumns_)),
nonPartitionChannels_(computeNonPartitionChannels(inputColumns_)) {
if (compressionKind.has_value()) {
VELOX_CHECK(
compressionKind.value() != common::CompressionKind_MAX,
"Unsupported compression type: CompressionKind_MAX");
}

if (ensureFiles_) {
// If ensureFiles is set and either the bucketProperty is set or some
// partition keys are in the data, there is not a 1:1 mapping from Task to
// files so we can't proactively create writers.
VELOX_CHECK(
bucketProperty_ == nullptr || bucketProperty_->bucketCount() == 0,
"ensureFiles is not supported with bucketing");

for (const auto& inputColumn : inputColumns_) {
VELOX_CHECK(
!inputColumn->isPartitionKey(),
"ensureFiles is not supported with partition keys in the data");
}
}
}

HiveDataSink::HiveDataSink(
RowTypePtr inputType,
std::shared_ptr<const HiveInsertTableHandle> insertTableHandle,
Expand All @@ -482,13 +443,18 @@ HiveDataSink::HiveDataSink(
*insertTableHandle->bucketProperty(),
inputType)
: nullptr,
insertTableHandle->partitionChannels(),
insertTableHandle->nonPartitionChannels(),
createPartitionIdGenerator(
inputType,
insertTableHandle,
hiveConfig,
connectorQueryCtx)) {}
getPartitionChannels(insertTableHandle),
getNonPartitionChannels(insertTableHandle),
!getPartitionChannels(insertTableHandle).empty()
? std::make_unique<PartitionIdGenerator>(
inputType,
getPartitionChannels(insertTableHandle),
hiveConfig->maxPartitionsPerWriters(
connectorQueryCtx->sessionProperties()),
connectorQueryCtx->memoryPool(),
hiveConfig->isPartitionPathAsLowerCase(
connectorQueryCtx->sessionProperties()))
: nullptr) {}

HiveDataSink::HiveDataSink(
RowTypePtr inputType,
Expand Down Expand Up @@ -592,6 +558,8 @@ void HiveDataSink::appendData(RowVectorPtr input) {
// Compute partition and bucket numbers.
computePartitionAndBucketIds(input);

splitInputRowsAndEnsureWriters(input);

// All inputs belong to a single non-bucketed partition. The partition id
// must be zero.
if (!isBucketed() && partitionIdGenerator_->numPartitions() == 1) {
Expand All @@ -600,8 +568,6 @@ void HiveDataSink::appendData(RowVectorPtr input) {
return;
}

splitInputRowsAndEnsureWriters();

for (auto index = 0; index < writers_.size(); ++index) {
const vector_size_t partitionSize = partitionSizes_[index];
if (partitionSize == 0) {
Expand Down Expand Up @@ -1014,7 +980,7 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {

std::optional<std::string> partitionName;
if (isPartitioned()) {
partitionName = getPartitionName(id.partitionId.value());
partitionName = getPartitionName(id);
}

// Without explicitly setting flush policy, the default memory based flush
Expand Down Expand Up @@ -1082,13 +1048,14 @@ std::unique_ptr<dwio::common::Writer> HiveDataSink::createWriterForIndex(
return maybeCreateBucketSortWriter(writerIndex, std::move(writer));
}

std::string HiveDataSink::getPartitionName(uint32_t partitionId) const {
VELOX_CHECK_NOT_NULL(partitionIdGenerator_);

return HivePartitionName::partitionName(
partitionId,
partitionIdGenerator_->partitionValues(),
partitionKeyAsLowerCase_);
std::optional<std::string> HiveDataSink::getPartitionName(
const HiveWriterId& id) const {
std::optional<std::string> partitionName;
if (isPartitioned()) {
partitionName =
partitionIdGenerator_->partitionName(id.partitionId.value());
}
return partitionName;
}

std::unique_ptr<facebook::velox::dwio::common::Writer>
Expand Down Expand Up @@ -1120,6 +1087,13 @@ HiveDataSink::maybeCreateBucketSortWriter(
sortWriterFinishTimeSliceLimitMs_);
}

void HiveDataSink::extendBuffersForPartitionedTables() {
// Extends the buffer used for partition rows calculations.
partitionSizes_.emplace_back(0);
partitionRows_.emplace_back(nullptr);
rawPartitionRows_.emplace_back(nullptr);
}

HiveWriterId HiveDataSink::getWriterId(size_t row) const {
std::optional<int32_t> partitionId;
if (isPartitioned()) {
Expand Down Expand Up @@ -1152,7 +1126,7 @@ void HiveDataSink::updatePartitionRows(
++partitionSizes_[index];
}

void HiveDataSink::splitInputRowsAndEnsureWriters() {
void HiveDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr /* input */) {
VELOX_CHECK(isPartitioned() || isBucketed());
if (isBucketed() && isPartitioned()) {
VELOX_CHECK_EQ(bucketIds_.size(), partitionIds_.size());
Expand All @@ -1165,6 +1139,7 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
for (auto row = 0; row < numRows; ++row) {
const auto id = getWriterId(row);
const uint32_t index = ensureWriter(id);

updatePartitionRows(index, numRows, row);
}

Expand Down
Loading