Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions bolt/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,11 @@ void HashAggregation::initialize() {
operatorCtx_->operatorId(), rowbasedSpillMode);
}

if (isDistinct_) {
addRuntimeStat("isDistinctAggregation", RuntimeCounter(1));
for (auto i = 0; i < hashers.size(); ++i) {
identityProjections_.emplace_back(hashers[i]->channel(), i);
}
} else {
addRuntimeStat("isDistinctAggregation", RuntimeCounter(0));
}
addRuntimeStat("isDistinctAggregation", RuntimeCounter(isDistinct_));

for (auto i = 0; i < hashers.size(); ++i) {
identityProjections_.emplace_back(hashers[i]->channel(), i);
}
std::optional<column_index_t> groupIdChannel;
if (aggregationNode_->groupId().has_value()) {
groupIdChannel = outputType_->getChildIdxIfExists(
Expand Down
74 changes: 53 additions & 21 deletions bolt/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@

#include "bolt/exec/HashProbe.h"
#include <common/time/Timer.h>
#include <core/PlanNode.h>
#include <core/QueryConfig.h>
#include <exec/Driver.h>
#include <exec/Operator.h>
#include <algorithm>
#include <unordered_map>
#include "bolt/exec/OperatorUtils.h"
#include "bolt/exec/Task.h"
Expand Down Expand Up @@ -179,11 +181,15 @@ void HashProbe::initialize() {

size_t numIdentityProjections = 0;
for (auto i = 0; i < probeType_->size(); ++i) {
auto name = probeType_->nameOf(i);
auto& name = probeType_->nameOf(i);
auto outIndex = outputType_->getChildIdxIfExists(name);
if (outIndex.has_value()) {
identityProjections_.emplace_back(i, outIndex.value());
if (outIndex.value() == i) {
if (!outIndex.has_value()) {
continue;
}
projectedInputColumns_.emplace_back(i, *outIndex);
if (!isRightJoin(joinType_) && !isFullJoin(joinType_)) {
identityProjections_.emplace_back(i, *outIndex);
if (*outIndex == i) {
++numIdentityProjections;
}
}
Expand Down Expand Up @@ -406,7 +412,8 @@ void HashProbe::asyncWaitForHashTable() {
}
} else if (
(isInnerJoin(joinType_) || isLeftSemiFilterJoin(joinType_) ||
isRightSemiFilterJoin(joinType_) || isRightSemiProjectJoin(joinType_)) &&
isRightSemiFilterJoin(joinType_) || isRightSemiProjectJoin(joinType_) ||
isRightJoin(joinType_)) &&
table_->hashMode() != BaseHashTable::HashMode::kHash && !isSpillInput() &&
!hasMoreSpillData()) {
// Find out whether there are any upstream operators that can accept
Expand Down Expand Up @@ -624,7 +631,8 @@ void HashProbe::clearDynamicFilters() {
// * hash table has a single key with unique values,
// * build side has no dependent columns.
if (keyChannels_.size() == 1 && !table_->hasDuplicateKeys() &&
tableOutputProjections_.empty() && !filter_ && !dynamicFilters_.empty()) {
tableOutputProjections_.empty() && !filter_ && !dynamicFilters_.empty() &&
!isRightJoin(joinType_)) {
canReplaceWithDynamicFilter_ = true;
}

Expand Down Expand Up @@ -654,6 +662,9 @@ void HashProbe::addInput(RowVectorPtr input) {
}
input_ = std::move(input);

// Reset passingInputRowsInitialized_ as input_ as changed.
passingInputRowsInitialized_ = false;

const auto numInput = input_->size();

if (numInput > 0) {
Expand Down Expand Up @@ -765,7 +776,7 @@ void HashProbe::addInput(RowVectorPtr input) {
void HashProbe::prepareOutput(vector_size_t size) {
// Try to re-use memory for the output vectors that contain build-side data.
// We expect output vectors containing probe-side data to be null (reset in
// clearIdentityProjectedOutput). BaseVector::prepareForReuse keeps null
// clearProjectedOutput(). BaseVector::prepareForReuse keeps null
// children unmodified and makes non-null (build side) children reusable.
if (output_) {
VectorPtr output = std::move(output_);
Expand Down Expand Up @@ -843,12 +854,12 @@ void HashProbe::fillLeftSemiProjectMatchColumn(vector_size_t size) {

void HashProbe::fillOutput(vector_size_t size) {
prepareOutput(size);
for (auto projection : identityProjections_) {
for (auto projection : projectedInputColumns_) {
ensureLoadedIfNotAtEnd(projection.inputChannel);
}

wrapIndirectChildren(
identityProjections_,
projectedInputColumns_,
input_->children(),
size,
outputRowMapping_,
Expand Down Expand Up @@ -956,7 +967,7 @@ RowVectorPtr HashProbe::getBuildSideOutput() {
prepareOutput(numOut);

// Populate probe-side columns of the output with nulls.
for (auto projection : identityProjections_) {
for (auto projection : projectedInputColumns_) {
output_->childAt(projection.outputChannel) = BaseVector::createNullConstant(
outputType_->childAt(projection.outputChannel), numOut, pool());
}
Expand Down Expand Up @@ -988,11 +999,11 @@ RowVectorPtr HashProbe::getBuildSideOutput() {
return output_;
}

void HashProbe::clearIdentityProjectedOutput() {
void HashProbe::clearProjectedOutput() {
if (!output_ || !output_.unique()) {
return;
}
for (auto& projection : identityProjections_) {
for (auto& projection : projectedInputColumns_) {
output_->childAt(projection.outputChannel) = nullptr;
}
}
Expand Down Expand Up @@ -1066,7 +1077,7 @@ RowVectorPtr HashProbe::getOutput() {
}
checkRunning();

clearIdentityProjectedOutput();
clearProjectedOutput();
if (!input_) {
if (!hasMoreInput()) {
if (needLastProbe() && lastProber_) {
Expand Down Expand Up @@ -1204,7 +1215,22 @@ RowVectorPtr HashProbe::getOutput() {
void HashProbe::fillFilterInput(vector_size_t size) {
std::vector<VectorPtr> filterColumns(filterInputType_->size());
for (auto projection : filterInputProjections_) {
ensureLoadedIfNotAtEnd(projection.inputChannel);
if (std::any_of(
projectedInputColumns_.begin(),
projectedInputColumns_.end(),
[&](const auto& p) {
return p.inputChannel == projection.inputChannel;
})) {
// If the column is projected to the output, ensure it's loaded if it's
// lazy in case the filter only loads an incomplete subset of the rows
// that will be output.
ensureLoaded(projection.inputChannel);
} else {
// If the column isn't projected to the output, the Vector will only be
// reused if we've broken the input batch into multiple output batches,
// i.e. if results_ is not at the end of the iterator.
ensureLoadedIfNotAtEnd(projection.inputChannel);
}
}

wrapIndirectChildren(
Expand Down Expand Up @@ -1616,19 +1642,25 @@ void HashProbe::ensureLoadedIfNotAtEnd(column_index_t channel) {
auto inputChild = input_->childAt(channel);
bool forceLoaded = input_->containsLazyNotLoaded() &&
isLazyNotLoaded(*inputChild) && inputChild->containingLazyAndWrapped();
if (!forceLoaded &&
((!filter_ &&
(isLeftSemiFilterJoin(joinType_) || isLeftSemiProjectJoin(joinType_) ||
isAntiJoin(joinType_))) ||
results_.atEnd())) {
if (!forceLoaded && results_.atEnd()) {
return;
}

ensureLoaded(channel, forceLoaded);
}

void HashProbe::ensureLoaded(column_index_t channel, bool forceLoaded) {
if (!forceLoaded &&
(!filter_ &&
(isLeftSemiFilterJoin(joinType_) || isLeftSemiProjectJoin(joinType_) ||
isAntiJoin(joinType_)))) {
return;
}
if (!passingInputRowsInitialized_) {
passingInputRowsInitialized_ = true;
passingInputRows_.resize(input_->size());
if (isLeftJoin(joinType_) || isFullJoin(joinType_) ||
isLeftSemiProjectJoin(joinType_)) {
isLeftSemiProjectJoin(joinType_) || isAntiJoin(joinType_)) {
passingInputRows_.setAll();
} else {
passingInputRows_.clearAll();
Expand All @@ -1643,7 +1675,7 @@ void HashProbe::ensureLoadedIfNotAtEnd(column_index_t channel) {
passingInputRows_.updateBounds();
}

LazyVector::ensureLoadedRows(inputChild, passingInputRows_);
LazyVector::ensureLoadedRows(input_->childAt(channel), passingInputRows_);
}

void HashProbe::noMoreInput() {
Expand Down
9 changes: 7 additions & 2 deletions bolt/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class HashProbe : public Operator {
// number mappings or input vectors. In this way input vectors do
// not have to be copied and will be singly referenced by their
// producer.
void clearIdentityProjectedOutput();
void clearProjectedOutput();

// Populate output columns with matching build-side rows
// for the right semi join and non-matching build-side rows
Expand Down Expand Up @@ -179,6 +179,8 @@ class HashProbe : public Operator {

void ensureLoadedIfNotAtEnd(column_index_t channel);

void ensureLoaded(column_index_t channel, bool forceLoaded = false);

// Indicates if the operator has more probe inputs from either the upstream
// operator or the spill input reader.
bool hasMoreInput() const;
Expand Down Expand Up @@ -356,6 +358,9 @@ class HashProbe : public Operator {
// Type of the RowVector for filter inputs.
RowTypePtr filterInputType_;

// The input channels that are projected to the output.
std::vector<IdentityProjection> projectedInputColumns_;

// Maps input channels to channels in 'filterInputType_'.
std::vector<IdentityProjection> filterInputProjections_;

Expand Down Expand Up @@ -600,7 +605,7 @@ class HashProbe : public Operator {
SelectivityVector activeRows_;

// True if passingInputRows is up to date.
bool passingInputRowsInitialized_;
bool passingInputRowsInitialized_{false};

// Set of input rows for which there is at least one join hit. All
// set if right side optional. Used when loading lazy vectors for
Expand Down
Loading