Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion velox/common/memory/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ DEBUG_ONLY_TEST_P(
folly::EventCount taskPauseWait;
auto taskPauseWaitKey = taskPauseWait.prepareWait();

const auto fakeAllocationSize = kMemoryCapacity - (32L << 20);
const auto fakeAllocationSize = kMemoryCapacity - (2L << 20);

std::atomic<bool> injectAllocationOnce{true};
fakeOperatorFactory_->setAllocationCallback([&](Operator* op) {
Expand Down
9 changes: 9 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -3004,6 +3004,15 @@ class AbstractJoinNode : public PlanNode {
return isInnerJoin() || isLeftJoin() || isAntiJoin();
}

// Indicates if this joinNode can drop duplicate rows with same join key.
// For left semi and anti join, it is not necessary to store duplicate rows.
bool canDropDuplicates() const {
// Left semi and anti join with no extra filter only needs to know whether
// there is a match. Hence, no need to store entries with duplicate keys.
return !filter() &&
(isLeftSemiFilterJoin() || isLeftSemiProjectJoin() || isAntiJoin());
}

const std::vector<FieldAccessTypedExprPtr>& leftKeys() const {
return leftKeys_;
}
Expand Down
14 changes: 14 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ class QueryConfig {
static constexpr const char* kAbandonPartialTopNRowNumberMinPct =
"abandon_partial_topn_row_number_min_pct";

static constexpr const char* kAbandonBuildNoDupHashMinRows =
"abandon_build_no_dup_hash_min_rows";

static constexpr const char* kAbandonBuildNoDupHashMinPct =
"abandon_build_no_dup_hash_min_pct";

static constexpr const char* kMaxElementsSizeInRepeatAndSequence =
"max_elements_size_in_repeat_and_sequence";

Expand Down Expand Up @@ -760,6 +766,14 @@ class QueryConfig {
return get<int32_t>(kAbandonPartialTopNRowNumberMinPct, 80);
}

int32_t abandonBuildNoDupHashMinRows() const {
return get<int32_t>(kAbandonBuildNoDupHashMinRows, 100'000);
}

int32_t abandonBuildNoDupHashMinPct() const {
return get<int32_t>(kAbandonBuildNoDupHashMinPct, 0);
}

int32_t maxElementsSizeInRepeatAndSequence() const {
return get<int32_t>(kMaxElementsSizeInRepeatAndSequence, 10'000);
}
Expand Down
10 changes: 10 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ Generic Configuration
- integer
- 80
- Abandons partial TopNRowNumber if number of output rows equals or exceeds this percentage of the number of input rows.
* - abandon_build_no_dup_hash_min_rows
- integer
- 100,000
- Number of input rows to receive before starting to check whether to abandon building a HashTable without
duplicates in HashBuild for left semi/anti join.
* - abandon_build_no_dup_hash_min_pct
- integer
- 0
- Abandons building a HashTable without duplicates in HashBuild for left semi/anti join if the percentage of
distinct keys in the HashTable exceeds this threshold. Zero means 'disable this optimization'.
* - session_timezone
- string
-
Expand Down
87 changes: 65 additions & 22 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ HashBuild::HashBuild(
joinBridge_(operatorCtx_->task()->getHashJoinBridgeLocked(
operatorCtx_->driverCtx()->splitGroupId,
planNodeId())),
keyChannelMap_(joinNode_->rightKeys().size()) {
dropDuplicates_(joinNode_->canDropDuplicates()),
keyChannelMap_(joinNode_->rightKeys().size()),
abandonBuildNoDupHashMinRows_(
driverCtx->queryConfig().abandonBuildNoDupHashMinRows()),
abandonBuildNoDupHashMinPct_(
driverCtx->queryConfig().abandonBuildNoDupHashMinPct()) {
VELOX_CHECK(pool()->trackUsage());
VELOX_CHECK_NOT_NULL(joinBridge_);

Expand All @@ -86,19 +91,22 @@ HashBuild::HashBuild(

// Identify the non-key build side columns and make a decoder for each.
const int32_t numDependents = inputType->size() - numKeys;
if (numDependents > 0) {
// Number of join keys (numKeys) may be less then number of input columns
// (inputType->size()). In this case numDependents is negative and cannot be
// used to call 'reserve'. This happens when we join different probe side
// keys with the same build side key: SELECT * FROM t LEFT JOIN u ON t.k1 =
// u.k AND t.k2 = u.k.
dependentChannels_.reserve(numDependents);
decoders_.reserve(numDependents);
}
for (auto i = 0; i < inputType->size(); ++i) {
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
dependentChannels_.emplace_back(i);
decoders_.emplace_back(std::make_unique<DecodedVector>());
if (!dropDuplicates_) {
if (numDependents > 0) {
// Number of join keys (numKeys) may be less then number of input columns
// (inputType->size()). In this case numDependents is negative and cannot
// be used to call 'reserve'. This happens when we join different probe
// side keys with the same build side key: SELECT * FROM t LEFT JOIN u ON
// t.k1 = u.k AND t.k2 = u.k.
dependentChannels_.reserve(numDependents);
decoders_.reserve(numDependents);
}

for (auto i = 0; i < inputType->size(); ++i) {
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
dependentChannels_.emplace_back(i);
decoders_.emplace_back(std::make_unique<DecodedVector>());
}
}
}

Expand Down Expand Up @@ -146,11 +154,6 @@ void HashBuild::setupTable() {
.minTableRowsForParallelJoinBuild(),
pool());
} else {
// (Left) semi and anti join with no extra filter only needs to know whether
// there is a match. Hence, no need to store entries with duplicate keys.
const bool dropDuplicates = !joinNode_->filter() &&
(joinNode_->isLeftSemiFilterJoin() ||
joinNode_->isLeftSemiProjectJoin() || isAntiJoin(joinType_));
// Right semi join needs to tag build rows that were probed.
const bool needProbedFlag = joinNode_->isRightSemiFilterJoin();
if (isLeftNullAwareJoinWithFilter(joinNode_)) {
Expand All @@ -159,7 +162,7 @@ void HashBuild::setupTable() {
table_ = HashTable<false>::createForJoin(
std::move(keyHashers),
dependentTypes,
!dropDuplicates, // allowDuplicates
!dropDuplicates_, // allowDuplicates
needProbedFlag, // hasProbedFlag
operatorCtx_->driverCtx()
->queryConfig()
Expand All @@ -170,15 +173,22 @@ void HashBuild::setupTable() {
table_ = HashTable<true>::createForJoin(
std::move(keyHashers),
dependentTypes,
!dropDuplicates, // allowDuplicates
!dropDuplicates_, // allowDuplicates
needProbedFlag, // hasProbedFlag
operatorCtx_->driverCtx()
->queryConfig()
.minTableRowsForParallelJoinBuild(),
pool());
}
}
lookup_ = std::make_unique<HashLookup>(table_->hashers(), pool());
analyzeKeys_ = table_->hashMode() != BaseHashTable::HashMode::kHash;
if (abandonBuildNoDupHashMinPct_ == 0) {
// Building a HashTable without duplicates is disabled if
// abandonBuildNoDupHashMinPct_ is 0.
abandonBuildNoDupHash_ = true;
table_->joinTableMayHaveDuplicates();
}
}

void HashBuild::setupSpiller(SpillPartition* spillPartition) {
Expand Down Expand Up @@ -377,6 +387,31 @@ void HashBuild::addInput(RowVectorPtr input) {
return;
}

if (dropDuplicates_ && !abandonBuildNoDupHash_) {
const bool abandonEarly = abandonBuildNoDupHashEarly(table_->numDistinct());
numHashInputRows_ += activeRows_.countSelected();
if (abandonEarly) {
// The hash table is no longer directly constructed in addInput. The data
// that was previously inserted into the hash table is already in the
// RowContainer.
addRuntimeStat("abandonBuildNoDupHash", RuntimeCounter(1));
abandonBuildNoDupHash_ = true;
table_->joinTableMayHaveDuplicates();
} else {
table_->prepareForGroupProbe(
*lookup_,
input,
activeRows_,
BaseHashTable::kNoSpillInputStartPartitionBit);
if (lookup_->rows.empty()) {
return;
}
table_->groupProbe(
*lookup_, BaseHashTable::kNoSpillInputStartPartitionBit);
return;
}
}

if (analyzeKeys_ && hashes_.size() < activeRows_.end()) {
hashes_.resize(activeRows_.end());
}
Expand Down Expand Up @@ -756,7 +791,8 @@ bool HashBuild::finishHashBuild() {
isInputFromSpill() ? spillConfig()->startPartitionBit
: BaseHashTable::kNoSpillInputStartPartitionBit,
allowParallelJoinBuild ? operatorCtx_->task()->queryCtx()->executor()
: nullptr);
: nullptr,
dropDuplicates_);
}
stats_.wlock()->addRuntimeStat(
BaseHashTable::kBuildWallNanos,
Expand Down Expand Up @@ -879,6 +915,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {
setupTable();
setupSpiller(spillInput.spillPartition.get());
stateCleared_ = false;
numHashInputRows_ = 0;

// Start to process spill input.
processSpillInput();
Expand Down Expand Up @@ -1240,4 +1277,10 @@ void HashBuildSpiller::extractSpill(
rows.data(), rows.size(), false, false, result->childAt(types.size()));
}
}

bool HashBuild::abandonBuildNoDupHashEarly(int64_t numDistinct) const {
VELOX_CHECK(dropDuplicates_);
return numHashInputRows_ > abandonBuildNoDupHashMinRows_ &&
numDistinct / numHashInputRows_ >= abandonBuildNoDupHashMinPct_ / 100;
}
} // namespace facebook::velox::exec
29 changes: 29 additions & 0 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ class HashBuild final : public Operator {
// not.
bool nonReclaimableState() const;

// True if we have enough rows and not enough duplicate join keys, i.e. more
// than 'abandonBuildNoDuplicatesHashMinRows_' rows and more than
// 'abandonBuildNoDuplicatesHashMinPct_' % of rows are unique.
bool abandonBuildNoDupHashEarly(int64_t numDistinct) const;

const std::shared_ptr<const core::HashJoinNode> joinNode_;

const core::JoinType joinType_;
Expand Down Expand Up @@ -242,6 +247,9 @@ class HashBuild final : public Operator {
// Container for the rows being accumulated.
std::unique_ptr<BaseHashTable> table_;

// Used for building hash table while adding input rows.
std::unique_ptr<HashLookup> lookup_;

// Key channels in 'input_'
std::vector<column_index_t> keyChannels_;

Expand Down Expand Up @@ -269,6 +277,14 @@ class HashBuild final : public Operator {
// at least one entry with null join keys.
bool joinHasNullKeys_{false};

// Indicates whether drop duplicate rows. Rows containing duplicate keys
// can be removed for left semi and anti join.
const bool dropDuplicates_;

// Whether to abandon building a HashTable without duplicates in HashBuild
// addInput phase for left semi/anti join.
bool abandonBuildNoDupHash_{false};

// The type used to spill hash table which might attach a boolean column to
// record the probed flag if 'needProbedFlagSpill_' is true.
RowTypePtr spillType_;
Expand Down Expand Up @@ -310,6 +326,19 @@ class HashBuild final : public Operator {

// Maps key channel in 'input_' to channel in key.
folly::F14FastMap<column_index_t, column_index_t> keyChannelMap_;

// Count the number of hash table input rows for building no duplicates
// hash table. It will not be updated after abandonBuildNoDupHash_ is true.
int64_t numHashInputRows_ = 0;

// Minimum number of rows to see before deciding to give up build no
// duplicates hash table.
const int32_t abandonBuildNoDupHashMinRows_;

// Min unique rows pct for give up build no duplicates hash table. If more
// than this many rows are unique, build hash table in addInput phase is not
// worthwhile.
const int32_t abandonBuildNoDupHashMinPct_;
};

inline std::ostream& operator<<(std::ostream& os, HashBuild::State state) {
Expand Down
7 changes: 6 additions & 1 deletion velox/exec/HashJoinBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,18 @@ RowTypePtr hashJoinTableType(
types.emplace_back(inputType->childAt(channel));
}

if (joinNode->canDropDuplicates()) {
// For left semi and anti join with no extra filter, hash table does not
// store dependent columns.
return ROW(std::move(names), std::move(types));
}

for (auto i = 0; i < inputType->size(); ++i) {
if (keyChannelSet.find(i) == keyChannelSet.end()) {
names.emplace_back(inputType->nameOf(i));
types.emplace_back(inputType->childAt(i));
}
}

return ROW(std::move(names), std::move(types));
}

Expand Down
24 changes: 22 additions & 2 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ HashTable<ignoreNullKeys>::HashTable(
pool_(pool),
minTableSizeForParallelJoinBuild_(minTableSizeForParallelJoinBuild),
isJoinBuild_(isJoinBuild),
joinBuildNoDuplicates_(!allowDuplicates),
buildPartitionBounds_(raw_vector<PartitionBoundIndexType>(pool)) {
std::vector<TypePtr> keys;
for (auto& hasher : hashers_) {
Expand Down Expand Up @@ -1487,7 +1488,9 @@ void HashTable<ignoreNullKeys>::decideHashMode(
return;
}
disableRangeArrayHash_ |= disableRangeArrayHash;
if (numDistinct_ && !isJoinBuild_) {
if (numDistinct_ && (!isJoinBuild_ || joinBuildNoDuplicates_)) {
// If the join type is left semi and anti, allowDuplicates_ will be false,
// and join build is building hash table while adding input rows.
if (!analyze()) {
setHashMode(HashMode::kHash, numNew, spillInputStartPartitionBit);
return;
Expand Down Expand Up @@ -1709,8 +1712,20 @@ template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::prepareJoinTable(
std::vector<std::unique_ptr<BaseHashTable>> tables,
int8_t spillInputStartPartitionBit,
folly::Executor* executor) {
folly::Executor* executor,
bool dropDuplicates) {
buildExecutor_ = executor;
if (dropDuplicates) {
if (table_ != nullptr) {
// Reset table_ and capacity_ to trigger rehash.
rows_->pool()->freeContiguous(tableAllocation_);
table_ = nullptr;
capacity_ = 0;
}
// Call analyze to insert all unique values in row container to the
// table hashers' uniqueValues_;
analyze();
}
otherTables_.reserve(tables.size());
for (auto& table : tables) {
otherTables_.emplace_back(std::unique_ptr<HashTable<ignoreNullKeys>>(
Expand Down Expand Up @@ -1740,6 +1755,11 @@ void HashTable<ignoreNullKeys>::prepareJoinTable(
}
if (useValueIds) {
for (auto& other : otherTables_) {
if (dropDuplicates) {
// Before merging with the current hashers, all values in the row
// containers of other table need to be inserted into uniqueValues_.
other->analyze();
}
for (auto i = 0; i < hashers_.size(); ++i) {
hashers_[i]->merge(*other->hashers_[i]);
if (!hashers_[i]->mayUseValueIds()) {
Expand Down
Loading