Skip to content

Commit 73ee99f

Browse files
liujiayi771zhouyuan
authored andcommitted
Stream input row to hash table when addInput for left semi and anti join
Address comments disable by default
1 parent b263d9d commit 73ee99f

File tree

12 files changed

+705
-30
lines changed

12 files changed

+705
-30
lines changed

velox/common/memory/tests/SharedArbitratorTest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ DEBUG_ONLY_TEST_P(
755755
folly::EventCount taskPauseWait;
756756
auto taskPauseWaitKey = taskPauseWait.prepareWait();
757757

758-
const auto fakeAllocationSize = kMemoryCapacity - (32L << 20);
758+
const auto fakeAllocationSize = kMemoryCapacity - (2L << 20);
759759

760760
std::atomic<bool> injectAllocationOnce{true};
761761
fakeOperatorFactory_->setAllocationCallback([&](Operator* op) {

velox/core/PlanNode.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3004,6 +3004,15 @@ class AbstractJoinNode : public PlanNode {
30043004
return isInnerJoin() || isLeftJoin() || isAntiJoin();
30053005
}
30063006

3007+
// Indicates if this joinNode can drop duplicate rows with same join key.
3008+
// For left semi and anti join, it is not necessary to store duplicate rows.
3009+
bool canDropDuplicates() const {
3010+
// Left semi and anti join with no extra filter only needs to know whether
3011+
// there is a match. Hence, no need to store entries with duplicate keys.
3012+
return !filter() &&
3013+
(isLeftSemiFilterJoin() || isLeftSemiProjectJoin() || isAntiJoin());
3014+
}
3015+
30073016
const std::vector<FieldAccessTypedExprPtr>& leftKeys() const {
30083017
return leftKeys_;
30093018
}

velox/core/QueryConfig.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,12 @@ class QueryConfig {
181181
static constexpr const char* kAbandonPartialTopNRowNumberMinPct =
182182
"abandon_partial_topn_row_number_min_pct";
183183

184+
static constexpr const char* kAbandonBuildNoDupHashMinRows =
185+
"abandon_build_no_dup_hash_min_rows";
186+
187+
static constexpr const char* kAbandonBuildNoDupHashMinPct =
188+
"abandon_build_no_dup_hash_min_pct";
189+
184190
static constexpr const char* kMaxElementsSizeInRepeatAndSequence =
185191
"max_elements_size_in_repeat_and_sequence";
186192

@@ -760,6 +766,14 @@ class QueryConfig {
760766
return get<int32_t>(kAbandonPartialTopNRowNumberMinPct, 80);
761767
}
762768

769+
int32_t abandonBuildNoDupHashMinRows() const {
770+
return get<int32_t>(kAbandonBuildNoDupHashMinRows, 100'000);
771+
}
772+
773+
int32_t abandonBuildNoDupHashMinPct() const {
774+
return get<int32_t>(kAbandonBuildNoDupHashMinPct, 0);
775+
}
776+
763777
int32_t maxElementsSizeInRepeatAndSequence() const {
764778
return get<int32_t>(kMaxElementsSizeInRepeatAndSequence, 10'000);
765779
}

velox/docs/configs.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ Generic Configuration
4343
- integer
4444
- 80
4545
- Abandons partial TopNRowNumber if number of output rows equals or exceeds this percentage of the number of input rows.
46+
* - abandon_build_no_dup_hash_min_rows
47+
- integer
48+
- 100,000
49+
- Number of input rows to receive before starting to check whether to abandon building a HashTable without
50+
duplicates in HashBuild for left semi/anti join.
51+
* - abandon_build_no_dup_hash_min_pct
52+
- integer
53+
- 0
54+
- Abandons building a HashTable without duplicates in HashBuild for left semi/anti join if the percentage of
55+
distinct keys in the HashTable exceeds this threshold. Zero means 'disable this optimization'.
4656
* - session_timezone
4757
- string
4858
-

velox/exec/HashBuild.cpp

Lines changed: 65 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,12 @@ HashBuild::HashBuild(
6666
joinBridge_(operatorCtx_->task()->getHashJoinBridgeLocked(
6767
operatorCtx_->driverCtx()->splitGroupId,
6868
planNodeId())),
69-
keyChannelMap_(joinNode_->rightKeys().size()) {
69+
dropDuplicates_(joinNode_->canDropDuplicates()),
70+
keyChannelMap_(joinNode_->rightKeys().size()),
71+
abandonBuildNoDupHashMinRows_(
72+
driverCtx->queryConfig().abandonBuildNoDupHashMinRows()),
73+
abandonBuildNoDupHashMinPct_(
74+
driverCtx->queryConfig().abandonBuildNoDupHashMinPct()) {
7075
VELOX_CHECK(pool()->trackUsage());
7176
VELOX_CHECK_NOT_NULL(joinBridge_);
7277

@@ -86,19 +91,22 @@ HashBuild::HashBuild(
8691

8792
// Identify the non-key build side columns and make a decoder for each.
8893
const int32_t numDependents = inputType->size() - numKeys;
89-
if (numDependents > 0) {
90-
// Number of join keys (numKeys) may be less then number of input columns
91-
// (inputType->size()). In this case numDependents is negative and cannot be
92-
// used to call 'reserve'. This happens when we join different probe side
93-
// keys with the same build side key: SELECT * FROM t LEFT JOIN u ON t.k1 =
94-
// u.k AND t.k2 = u.k.
95-
dependentChannels_.reserve(numDependents);
96-
decoders_.reserve(numDependents);
97-
}
98-
for (auto i = 0; i < inputType->size(); ++i) {
99-
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
100-
dependentChannels_.emplace_back(i);
101-
decoders_.emplace_back(std::make_unique<DecodedVector>());
94+
if (!dropDuplicates_) {
95+
if (numDependents > 0) {
96+
// Number of join keys (numKeys) may be less then number of input columns
97+
// (inputType->size()). In this case numDependents is negative and cannot
98+
// be used to call 'reserve'. This happens when we join different probe
99+
// side keys with the same build side key: SELECT * FROM t LEFT JOIN u ON
100+
// t.k1 = u.k AND t.k2 = u.k.
101+
dependentChannels_.reserve(numDependents);
102+
decoders_.reserve(numDependents);
103+
}
104+
105+
for (auto i = 0; i < inputType->size(); ++i) {
106+
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
107+
dependentChannels_.emplace_back(i);
108+
decoders_.emplace_back(std::make_unique<DecodedVector>());
109+
}
102110
}
103111
}
104112

@@ -146,11 +154,6 @@ void HashBuild::setupTable() {
146154
.minTableRowsForParallelJoinBuild(),
147155
pool());
148156
} else {
149-
// (Left) semi and anti join with no extra filter only needs to know whether
150-
// there is a match. Hence, no need to store entries with duplicate keys.
151-
const bool dropDuplicates = !joinNode_->filter() &&
152-
(joinNode_->isLeftSemiFilterJoin() ||
153-
joinNode_->isLeftSemiProjectJoin() || isAntiJoin(joinType_));
154157
// Right semi join needs to tag build rows that were probed.
155158
const bool needProbedFlag = joinNode_->isRightSemiFilterJoin();
156159
if (isLeftNullAwareJoinWithFilter(joinNode_)) {
@@ -159,7 +162,7 @@ void HashBuild::setupTable() {
159162
table_ = HashTable<false>::createForJoin(
160163
std::move(keyHashers),
161164
dependentTypes,
162-
!dropDuplicates, // allowDuplicates
165+
!dropDuplicates_, // allowDuplicates
163166
needProbedFlag, // hasProbedFlag
164167
operatorCtx_->driverCtx()
165168
->queryConfig()
@@ -170,15 +173,22 @@ void HashBuild::setupTable() {
170173
table_ = HashTable<true>::createForJoin(
171174
std::move(keyHashers),
172175
dependentTypes,
173-
!dropDuplicates, // allowDuplicates
176+
!dropDuplicates_, // allowDuplicates
174177
needProbedFlag, // hasProbedFlag
175178
operatorCtx_->driverCtx()
176179
->queryConfig()
177180
.minTableRowsForParallelJoinBuild(),
178181
pool());
179182
}
180183
}
184+
lookup_ = std::make_unique<HashLookup>(table_->hashers(), pool());
181185
analyzeKeys_ = table_->hashMode() != BaseHashTable::HashMode::kHash;
186+
if (abandonBuildNoDupHashMinPct_ == 0) {
187+
// Building a HashTable without duplicates is disabled if
188+
// abandonBuildNoDupHashMinPct_ is 0.
189+
abandonBuildNoDupHash_ = true;
190+
table_->joinTableMayHaveDuplicates();
191+
}
182192
}
183193

184194
void HashBuild::setupSpiller(SpillPartition* spillPartition) {
@@ -377,6 +387,31 @@ void HashBuild::addInput(RowVectorPtr input) {
377387
return;
378388
}
379389

390+
if (dropDuplicates_ && !abandonBuildNoDupHash_) {
391+
const bool abandonEarly = abandonBuildNoDupHashEarly(table_->numDistinct());
392+
numHashInputRows_ += activeRows_.countSelected();
393+
if (abandonEarly) {
394+
// The hash table is no longer directly constructed in addInput. The data
395+
// that was previously inserted into the hash table is already in the
396+
// RowContainer.
397+
addRuntimeStat("abandonBuildNoDupHash", RuntimeCounter(1));
398+
abandonBuildNoDupHash_ = true;
399+
table_->joinTableMayHaveDuplicates();
400+
} else {
401+
table_->prepareForGroupProbe(
402+
*lookup_,
403+
input,
404+
activeRows_,
405+
BaseHashTable::kNoSpillInputStartPartitionBit);
406+
if (lookup_->rows.empty()) {
407+
return;
408+
}
409+
table_->groupProbe(
410+
*lookup_, BaseHashTable::kNoSpillInputStartPartitionBit);
411+
return;
412+
}
413+
}
414+
380415
if (analyzeKeys_ && hashes_.size() < activeRows_.end()) {
381416
hashes_.resize(activeRows_.end());
382417
}
@@ -756,7 +791,8 @@ bool HashBuild::finishHashBuild() {
756791
isInputFromSpill() ? spillConfig()->startPartitionBit
757792
: BaseHashTable::kNoSpillInputStartPartitionBit,
758793
allowParallelJoinBuild ? operatorCtx_->task()->queryCtx()->executor()
759-
: nullptr);
794+
: nullptr,
795+
dropDuplicates_);
760796
}
761797
stats_.wlock()->addRuntimeStat(
762798
BaseHashTable::kBuildWallNanos,
@@ -879,6 +915,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {
879915
setupTable();
880916
setupSpiller(spillInput.spillPartition.get());
881917
stateCleared_ = false;
918+
numHashInputRows_ = 0;
882919

883920
// Start to process spill input.
884921
processSpillInput();
@@ -1240,4 +1277,10 @@ void HashBuildSpiller::extractSpill(
12401277
rows.data(), rows.size(), false, false, result->childAt(types.size()));
12411278
}
12421279
}
1280+
1281+
bool HashBuild::abandonBuildNoDupHashEarly(int64_t numDistinct) const {
1282+
VELOX_CHECK(dropDuplicates_);
1283+
return numHashInputRows_ > abandonBuildNoDupHashMinRows_ &&
1284+
numDistinct / numHashInputRows_ >= abandonBuildNoDupHashMinPct_ / 100;
1285+
}
12431286
} // namespace facebook::velox::exec

velox/exec/HashBuild.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,11 @@ class HashBuild final : public Operator {
204204
// not.
205205
bool nonReclaimableState() const;
206206

207+
// True if we have enough rows and not enough duplicate join keys, i.e. more
208+
// than 'abandonBuildNoDuplicatesHashMinRows_' rows and more than
209+
// 'abandonBuildNoDuplicatesHashMinPct_' % of rows are unique.
210+
bool abandonBuildNoDupHashEarly(int64_t numDistinct) const;
211+
207212
const std::shared_ptr<const core::HashJoinNode> joinNode_;
208213

209214
const core::JoinType joinType_;
@@ -242,6 +247,9 @@ class HashBuild final : public Operator {
242247
// Container for the rows being accumulated.
243248
std::unique_ptr<BaseHashTable> table_;
244249

250+
// Used for building hash table while adding input rows.
251+
std::unique_ptr<HashLookup> lookup_;
252+
245253
// Key channels in 'input_'
246254
std::vector<column_index_t> keyChannels_;
247255

@@ -269,6 +277,14 @@ class HashBuild final : public Operator {
269277
// at least one entry with null join keys.
270278
bool joinHasNullKeys_{false};
271279

280+
// Indicates whether drop duplicate rows. Rows containing duplicate keys
281+
// can be removed for left semi and anti join.
282+
const bool dropDuplicates_;
283+
284+
// Whether to abandon building a HashTable without duplicates in HashBuild
285+
// addInput phase for left semi/anti join.
286+
bool abandonBuildNoDupHash_{false};
287+
272288
// The type used to spill hash table which might attach a boolean column to
273289
// record the probed flag if 'needProbedFlagSpill_' is true.
274290
RowTypePtr spillType_;
@@ -310,6 +326,19 @@ class HashBuild final : public Operator {
310326

311327
// Maps key channel in 'input_' to channel in key.
312328
folly::F14FastMap<column_index_t, column_index_t> keyChannelMap_;
329+
330+
// Count the number of hash table input rows for building no duplicates
331+
// hash table. It will not be updated after abandonBuildNoDupHash_ is true.
332+
int64_t numHashInputRows_ = 0;
333+
334+
// Minimum number of rows to see before deciding to give up build no
335+
// duplicates hash table.
336+
const int32_t abandonBuildNoDupHashMinRows_;
337+
338+
// Min unique rows pct for give up build no duplicates hash table. If more
339+
// than this many rows are unique, build hash table in addInput phase is not
340+
// worthwhile.
341+
const int32_t abandonBuildNoDupHashMinPct_;
313342
};
314343

315344
inline std::ostream& operator<<(std::ostream& os, HashBuild::State state) {

velox/exec/HashJoinBridge.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,18 @@ RowTypePtr hashJoinTableType(
4242
types.emplace_back(inputType->childAt(channel));
4343
}
4444

45+
if (joinNode->canDropDuplicates()) {
46+
// For left semi and anti join with no extra filter, hash table does not
47+
// store dependent columns.
48+
return ROW(std::move(names), std::move(types));
49+
}
50+
4551
for (auto i = 0; i < inputType->size(); ++i) {
4652
if (keyChannelSet.find(i) == keyChannelSet.end()) {
4753
names.emplace_back(inputType->nameOf(i));
4854
types.emplace_back(inputType->childAt(i));
4955
}
5056
}
51-
5257
return ROW(std::move(names), std::move(types));
5358
}
5459

velox/exec/HashTable.cpp

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ HashTable<ignoreNullKeys>::HashTable(
5757
pool_(pool),
5858
minTableSizeForParallelJoinBuild_(minTableSizeForParallelJoinBuild),
5959
isJoinBuild_(isJoinBuild),
60+
joinBuildNoDuplicates_(!allowDuplicates),
6061
buildPartitionBounds_(raw_vector<PartitionBoundIndexType>(pool)) {
6162
std::vector<TypePtr> keys;
6263
for (auto& hasher : hashers_) {
@@ -1487,7 +1488,9 @@ void HashTable<ignoreNullKeys>::decideHashMode(
14871488
return;
14881489
}
14891490
disableRangeArrayHash_ |= disableRangeArrayHash;
1490-
if (numDistinct_ && !isJoinBuild_) {
1491+
if (numDistinct_ && (!isJoinBuild_ || joinBuildNoDuplicates_)) {
1492+
// If the join type is left semi and anti, allowDuplicates_ will be false,
1493+
// and join build is building hash table while adding input rows.
14911494
if (!analyze()) {
14921495
setHashMode(HashMode::kHash, numNew, spillInputStartPartitionBit);
14931496
return;
@@ -1709,8 +1712,20 @@ template <bool ignoreNullKeys>
17091712
void HashTable<ignoreNullKeys>::prepareJoinTable(
17101713
std::vector<std::unique_ptr<BaseHashTable>> tables,
17111714
int8_t spillInputStartPartitionBit,
1712-
folly::Executor* executor) {
1715+
folly::Executor* executor,
1716+
bool dropDuplicates) {
17131717
buildExecutor_ = executor;
1718+
if (dropDuplicates) {
1719+
if (table_ != nullptr) {
1720+
// Reset table_ and capacity_ to trigger rehash.
1721+
rows_->pool()->freeContiguous(tableAllocation_);
1722+
table_ = nullptr;
1723+
capacity_ = 0;
1724+
}
1725+
// Call analyze to insert all unique values in row container to the
1726+
// table hashers' uniqueValues_;
1727+
analyze();
1728+
}
17141729
otherTables_.reserve(tables.size());
17151730
for (auto& table : tables) {
17161731
otherTables_.emplace_back(std::unique_ptr<HashTable<ignoreNullKeys>>(
@@ -1740,6 +1755,11 @@ void HashTable<ignoreNullKeys>::prepareJoinTable(
17401755
}
17411756
if (useValueIds) {
17421757
for (auto& other : otherTables_) {
1758+
if (dropDuplicates) {
1759+
// Before merging with the current hashers, all values in the row
1760+
// containers of other table need to be inserted into uniqueValues_.
1761+
other->analyze();
1762+
}
17431763
for (auto i = 0; i < hashers_.size(); ++i) {
17441764
hashers_[i]->merge(*other->hashers_[i]);
17451765
if (!hashers_[i]->mayUseValueIds()) {

0 commit comments

Comments
 (0)