Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ option(VELOX_ENABLE_HIVE_CONNECTOR "Build Hive connector." ON)
option(VELOX_ENABLE_TPCH_CONNECTOR "Build TPC-H connector." ON)
option(VELOX_ENABLE_TPCDS_CONNECTOR "Build TPC-DS connector." ON)
option(VELOX_ENABLE_PRESTO_FUNCTIONS "Build Presto SQL functions." ON)
option(VELOX_ENABLE_SPARK_FUNCTIONS "Build Spark SQL functions." ON)
option(VELOX_ENABLE_SPARK_FUNCTIONS "Build Spark SQL functions." OFF)
option(VELOX_ENABLE_ICEBERG_FUNCTIONS "Build Iceberg functions." ON)
option(VELOX_ENABLE_EXPRESSION "Build expression." ON)
option(
Expand Down
118 changes: 95 additions & 23 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,25 +93,52 @@ HiveDataSource::HiveDataSource(
}
}

std::vector<std::string> outputColumnHandleNames;
std::vector<TypePtr> outputColumnTypes;

std::vector<std::string> readColumnNames;
auto readColumnTypes = outputType_->children();
for (const auto& outputName : outputType_->names()) {
auto it = columnHandles.find(outputName);
std::vector<TypePtr> readColumnTypes; // = outputType_->children();

for (int i = 0; i < outputType_->size(); ++i) {
auto columnName = outputType_->nameOf(i);
auto& columnType = outputType_->childAt(i);

auto originalColumnName = columnName;
if (columnName.ends_with("_upcast")) {
originalColumnName =
columnName.substr(0, columnName.size() - strlen("_upcast"));
}

// Get the ColumnHandle name. This is the name without aliasing. e.g.
// originalColumnName="order_id_21", and columnHandleName="order_id"
auto it = columnHandles.find(originalColumnName);
VELOX_CHECK(
it != columnHandles.end(),
"ColumnHandle is missing for output column: {}",
outputName);

columnName);
auto* handle = static_cast<const HiveColumnHandle*>(it->second.get());
readColumnNames.push_back(handle->name());
auto columnHandleName = handle->name();

if (columnName.ends_with("_upcast")) {
columnHandleName += "_upcast";
}

outputColumnHandleNames.push_back(columnHandleName);
outputColumnTypes.push_back(columnType);

if (!columnName.ends_with("_upcast")) {
readColumnNames.push_back(columnHandleName);
readColumnTypes.push_back(columnType);
}
for (auto& subfield : handle->requiredSubfields()) {
VELOX_USER_CHECK_EQ(
getColumnName(subfield),
handle->name(),
"Required subfield does not match column name");
subfields_[handle->name()].push_back(&subfield);
subfields_[columnHandleName].push_back(&subfield);
}
}
outputTypeWithColumnHandleNames_ = ROW(outputColumnHandleNames, outputColumnTypes);

hiveTableHandle_ =
std::dynamic_pointer_cast<const HiveTableHandle>(tableHandle);
Expand Down Expand Up @@ -155,7 +182,7 @@ HiveDataSource::HiveDataSource(
}
// Remaining filter may reference columns that are not used otherwise,
// e.g. are not being projected out and are not used in range filters.
// Make sure to add these columns to readerOutputType_.
// Make sure to add these columns to readerOutputTypeWithoutUpcasts_.
readColumnNames.push_back(input->field());
readColumnTypes.push_back(input->type());
}
Expand All @@ -180,10 +207,11 @@ HiveDataSource::HiveDataSource(
}
}

readerOutputType_ =
// NO upcast columns
readerOutputTypeWithoutUpcasts_ =
ROW(std::move(readColumnNames), std::move(readColumnTypes));
scanSpec_ = makeScanSpec(
readerOutputType_,
readerOutputTypeWithoutUpcasts_,
subfields_,
filters_,
hiveTableHandle_->dataColumns(),
Expand All @@ -209,7 +237,7 @@ std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
&partitionKeys_,
connectorQueryCtx_,
hiveConfig_,
readerOutputType_,
readerOutputTypeWithoutUpcasts_,
ioStats_,
fsStats_,
fileHandleFactory_,
Expand All @@ -233,11 +261,12 @@ std::vector<column_index_t> HiveDataSource::setupBucketConversion() {
if (subfields_.erase(handle->name()) > 0) {
rebuildScanSpec = true;
}
auto index = readerOutputType_->getChildIdxIfExists(handle->name());
auto index =
readerOutputTypeWithoutUpcasts_->getChildIdxIfExists(handle->name());
if (!index.has_value()) {
if (names.empty()) {
names = readerOutputType_->names();
types = readerOutputType_->children();
names = readerOutputTypeWithoutUpcasts_->names();
types = readerOutputTypeWithoutUpcasts_->children();
}
index = names.size();
names.push_back(handle->name());
Expand All @@ -248,11 +277,11 @@ std::vector<column_index_t> HiveDataSource::setupBucketConversion() {
bucketChannels.push_back(*index);
}
if (!names.empty()) {
readerOutputType_ = ROW(std::move(names), std::move(types));
readerOutputTypeWithoutUpcasts_ = ROW(std::move(names), std::move(types));
}
if (rebuildScanSpec) {
auto newScanSpec = makeScanSpec(
readerOutputType_,
readerOutputTypeWithoutUpcasts_,
subfields_,
filters_,
hiveTableHandle_->dataColumns(),
Expand All @@ -274,7 +303,8 @@ void HiveDataSource::setupRowIdColumn() {
auto* rowId = scanSpec_->childByName(*specialColumns_.rowId);
VELOX_CHECK_NOT_NULL(rowId);
auto& rowIdType =
readerOutputType_->findChild(*specialColumns_.rowId)->asRow();
readerOutputTypeWithoutUpcasts_->findChild(*specialColumns_.rowId)
->asRow();
auto rowGroupId = split_->getFileName();
rowId->childByName(rowIdType.nameOf(1))
->setConstantValue<StringView>(
Expand Down Expand Up @@ -321,7 +351,7 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
// so we initialize it beforehand.
splitReader_->configureReaderOptions(randomSkip_);
splitReader_->prepareSplit(metadataFilter_, runtimeStats_);
readerOutputType_ = splitReader_->readerOutputType();
readerOutputTypeWithoutUpcasts_ = splitReader_->readerOutputType();
}

std::optional<RowVectorPtr> HiveDataSource::next(
Expand All @@ -341,13 +371,54 @@ std::optional<RowVectorPtr> HiveDataSource::next(
// Bucket conversion or delta update could add extra column to reader output.
auto needsExtraColumn = [&] {
return output_->asUnchecked<RowVector>()->childrenSize() <
readerOutputType_->size();
readerOutputTypeWithoutUpcasts_->size();
};
if (!output_ || needsExtraColumn()) {
output_ = BaseVector::create(readerOutputType_, 0, pool_);
output_ = BaseVector::create(readerOutputTypeWithoutUpcasts_, 0, pool_);
}

const auto rowsScanned = splitReader_->next(size, output_);

if (outputTypeWithColumnHandleNames_->size() > readerOutputTypeWithoutUpcasts_->size()) {
// find the upcast columns and add them to output_
std::vector<VectorPtr> outputColumns;
outputColumns.reserve(outputType_->size());

for (int i = 0; i < outputTypeWithColumnHandleNames_->size(); ++i) {
const auto& columnName = outputTypeWithColumnHandleNames_->nameOf(i);
if (columnName.ends_with("_upcast")) {
auto originalOutputName =
columnName.substr(0, columnName.size() - strlen("_upcast"));

auto index =
readerOutputTypeWithoutUpcasts_->getChildIdxIfExists(originalOutputName);
VELOX_CHECK(index.has_value());
auto originalColumn =
output_->asUnchecked<RowVector>()->childAt(*index);

auto casted = BaseVector::create(
outputTypeWithColumnHandleNames_->childAt(i), originalColumn->size(), pool_);
casted->copy(originalColumn.get(), 0, 0, originalColumn->size());
outputColumns.push_back(casted);
} else {
auto index = readerOutputTypeWithoutUpcasts_->getChildIdxIfExists(columnName);
VELOX_CHECK(index.has_value());
auto originalColumn =
output_->asUnchecked<RowVector>()->childAt(*index);
outputColumns.push_back(originalColumn);
}
}

outputWithUpcasts_ = std::make_shared<RowVector>(
pool_,
outputType_,
BufferPtr(nullptr),
output_->size(),
std::move(outputColumns));
} else {
outputWithUpcasts_ = output_;
}

completedRows_ += rowsScanned;
if (rowsScanned == 0) {
splitReader_->updateRuntimeStats(runtimeStats_);
Expand All @@ -363,7 +434,7 @@ std::optional<RowVectorPtr> HiveDataSource::next(
return getEmptyOutput();
}

auto rowVector = std::dynamic_pointer_cast<RowVector>(output_);
auto rowVector = std::dynamic_pointer_cast<RowVector>(outputWithUpcasts_);

// In case there is a remaining filter that excludes some but not all
// rows, collect the indices of the passing rows. If there is no filter,
Expand Down Expand Up @@ -483,7 +554,8 @@ void HiveDataSource::setFromDataSource(
runtimeStats_.skippedSplits += source->runtimeStats_.skippedSplits;
runtimeStats_.processedSplits += source->runtimeStats_.processedSplits;
runtimeStats_.skippedSplitBytes += source->runtimeStats_.skippedSplitBytes;
readerOutputType_ = std::move(source->readerOutputType_);
readerOutputTypeWithoutUpcasts_ =
std::move(source->readerOutputTypeWithoutUpcasts_);
source->scanSpec_->moveAdaptationFrom(*scanSpec_);
scanSpec_ = std::move(source->scanSpec_);
splitReader_ = std::move(source->splitReader_);
Expand Down Expand Up @@ -541,7 +613,7 @@ std::shared_ptr<wave::WaveDataSource> HiveDataSource::toWaveDataSource() {
waveDataSource_ = waveDelegateHook_(
hiveTableHandle_,
scanSpec_,
readerOutputType_,
readerOutputTypeWithoutUpcasts_,
&partitionKeys_,
fileHandleFactory_,
ioExecutor_,
Expand Down
8 changes: 8 additions & 0 deletions velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class HiveDataSource : public DataSource {
HiveTableHandlePtr hiveTableHandle_;
std::shared_ptr<common::ScanSpec> scanSpec_;
VectorPtr output_;
VectorPtr outputWithUpcasts_;
std::unique_ptr<SplitReader> splitReader_;

// Output type from file reader. This is different from outputType_ that it
Expand Down Expand Up @@ -148,7 +149,14 @@ class HiveDataSource : public DataSource {
}

// The row type for the data source output, not including filter-only columns
// May be aliased, e.g. (order_id_21, order_id_21_upcast). Does not include
// filter only columns
const RowTypePtr outputType_;
// Same as outputType_ but the column names are the ColumnHandle names
RowTypePtr outputTypeWithColumnHandleNames_;
RowTypePtr
readerOutputTypeWithoutUpcasts_; // The ColumnHandle name, e.g. order_id

core::ExpressionEvaluator* const expressionEvaluator_;

// Column handles for the Split info columns keyed on their column names.
Expand Down
9 changes: 9 additions & 0 deletions velox/core/Expressions.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,15 @@ class FieldAccessTypedExpr : public ITypedExpr {
const std::string& name() const {
return name_;
}
//
// void setName(const std::string& newName) {
// name_ = newName;
// }
//
// void updateNewType(const std::string& newName, TypePtr newType) {
// name_ = newName;
// type_= newType;
// }

TypedExprPtr rewriteInputNames(
const std::unordered_map<std::string, TypedExprPtr>& mapping)
Expand Down
8 changes: 8 additions & 0 deletions velox/core/ITypedExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,18 @@ class ITypedExpr : public ISerializable {
ExprKind kind() const {
return kind_;
}
//
// virtual const std::string& name() const {
// VELOX_UNSUPPORTED("name() is not supported for this expression");
// }

const TypePtr& type() const {
return type_;
}
//
// void setType(const TypePtr& newType) {
// type_.reset(newType.get());
// }

const std::vector<TypedExprPtr>& inputs() const {
return inputs_;
Expand Down
Loading