@@ -66,7 +66,12 @@ HashBuild::HashBuild(
6666      joinBridge_ (operatorCtx_->task ()->getHashJoinBridgeLocked(
6767          operatorCtx_->driverCtx ()->splitGroupId,
6868          planNodeId())),
69-       keyChannelMap_(joinNode_->rightKeys ().size()) {
69+       dropDuplicates_(joinNode_->canDropDuplicates ()),
70+       keyChannelMap_(joinNode_->rightKeys ().size()),
71+       abandonBuildNoDupHashMinRows_(
72+           driverCtx->queryConfig ().abandonBuildNoDupHashMinRows()),
73+       abandonBuildNoDupHashMinPct_(
74+           driverCtx->queryConfig ().abandonBuildNoDupHashMinPct()) {
7075  VELOX_CHECK (pool ()->trackUsage ());
7176  VELOX_CHECK_NOT_NULL (joinBridge_);
7277
@@ -86,19 +91,22 @@ HashBuild::HashBuild(
8691
8792  //  Identify the non-key build side columns and make a decoder for each.
8893  const  int32_t  numDependents = inputType->size () - numKeys;
89-   if  (numDependents > 0 ) {
90-     //  Number of join keys (numKeys) may be less then number of input columns
91-     //  (inputType->size()). In this case numDependents is negative and cannot be
92-     //  used to call 'reserve'. This happens when we join different probe side
93-     //  keys with the same build side key: SELECT * FROM t LEFT JOIN u ON t.k1 =
94-     //  u.k AND t.k2 = u.k.
95-     dependentChannels_.reserve (numDependents);
96-     decoders_.reserve (numDependents);
97-   }
98-   for  (auto  i = 0 ; i < inputType->size (); ++i) {
99-     if  (keyChannelMap_.find (i) == keyChannelMap_.end ()) {
100-       dependentChannels_.emplace_back (i);
101-       decoders_.emplace_back (std::make_unique<DecodedVector>());
94+   if  (!dropDuplicates_) {
95+     if  (numDependents > 0 ) {
96+       //  Number of join keys (numKeys) may be less then number of input columns
97+       //  (inputType->size()). In this case numDependents is negative and cannot
98+       //  be used to call 'reserve'. This happens when we join different probe
99+       //  side keys with the same build side key: SELECT * FROM t LEFT JOIN u ON
100+       //  t.k1 = u.k AND t.k2 = u.k.
101+       dependentChannels_.reserve (numDependents);
102+       decoders_.reserve (numDependents);
103+     }
104+ 
105+     for  (auto  i = 0 ; i < inputType->size (); ++i) {
106+       if  (keyChannelMap_.find (i) == keyChannelMap_.end ()) {
107+         dependentChannels_.emplace_back (i);
108+         decoders_.emplace_back (std::make_unique<DecodedVector>());
109+       }
102110    }
103111  }
104112
@@ -146,11 +154,6 @@ void HashBuild::setupTable() {
146154            .minTableRowsForParallelJoinBuild (),
147155        pool ());
148156  } else  {
149-     //  (Left) semi and anti join with no extra filter only needs to know whether
150-     //  there is a match. Hence, no need to store entries with duplicate keys.
151-     const  bool  dropDuplicates = !joinNode_->filter () &&
152-         (joinNode_->isLeftSemiFilterJoin () ||
153-          joinNode_->isLeftSemiProjectJoin () || isAntiJoin (joinType_));
154157    //  Right semi join needs to tag build rows that were probed.
155158    const  bool  needProbedFlag = joinNode_->isRightSemiFilterJoin ();
156159    if  (isLeftNullAwareJoinWithFilter (joinNode_)) {
@@ -159,7 +162,7 @@ void HashBuild::setupTable() {
159162      table_ = HashTable<false >::createForJoin (
160163          std::move (keyHashers),
161164          dependentTypes,
162-           !dropDuplicates , //  allowDuplicates
165+           !dropDuplicates_ , //  allowDuplicates
163166          needProbedFlag, //  hasProbedFlag
164167          operatorCtx_->driverCtx ()
165168              ->queryConfig ()
@@ -170,15 +173,22 @@ void HashBuild::setupTable() {
170173      table_ = HashTable<true >::createForJoin (
171174          std::move (keyHashers),
172175          dependentTypes,
173-           !dropDuplicates , //  allowDuplicates
176+           !dropDuplicates_ , //  allowDuplicates
174177          needProbedFlag, //  hasProbedFlag
175178          operatorCtx_->driverCtx ()
176179              ->queryConfig ()
177180              .minTableRowsForParallelJoinBuild (),
178181          pool ());
179182    }
180183  }
184+   lookup_ = std::make_unique<HashLookup>(table_->hashers (), pool ());
181185  analyzeKeys_ = table_->hashMode () != BaseHashTable::HashMode::kHash ;
186+   if  (abandonBuildNoDupHashMinPct_ == 0 ) {
187+     //  Building a HashTable without duplicates is disabled if
188+     //  abandonBuildNoDupHashMinPct_ is 0.
189+     abandonBuildNoDupHash_ = true ;
190+     table_->joinTableMayHaveDuplicates ();
191+   }
182192}
183193
184194void  HashBuild::setupSpiller (SpillPartition* spillPartition) {
@@ -377,6 +387,31 @@ void HashBuild::addInput(RowVectorPtr input) {
377387    return ;
378388  }
379389
390+   if  (dropDuplicates_ && !abandonBuildNoDupHash_) {
391+     const  bool  abandonEarly = abandonBuildNoDupHashEarly (table_->numDistinct ());
392+     numHashInputRows_ += activeRows_.countSelected ();
393+     if  (abandonEarly) {
394+       //  The hash table is no longer directly constructed in addInput. The data
395+       //  that was previously inserted into the hash table is already in the
396+       //  RowContainer.
397+       addRuntimeStat (" abandonBuildNoDupHash"  , RuntimeCounter (1 ));
398+       abandonBuildNoDupHash_ = true ;
399+       table_->joinTableMayHaveDuplicates ();
400+     } else  {
401+       table_->prepareForGroupProbe (
402+           *lookup_,
403+           input,
404+           activeRows_,
405+           BaseHashTable::kNoSpillInputStartPartitionBit );
406+       if  (lookup_->rows .empty ()) {
407+         return ;
408+       }
409+       table_->groupProbe (
410+           *lookup_, BaseHashTable::kNoSpillInputStartPartitionBit );
411+       return ;
412+     }
413+   }
414+ 
380415  if  (analyzeKeys_ && hashes_.size () < activeRows_.end ()) {
381416    hashes_.resize (activeRows_.end ());
382417  }
@@ -756,7 +791,8 @@ bool HashBuild::finishHashBuild() {
756791        isInputFromSpill () ? spillConfig ()->startPartitionBit 
757792                           : BaseHashTable::kNoSpillInputStartPartitionBit ,
758793        allowParallelJoinBuild ? operatorCtx_->task ()->queryCtx ()->executor ()
759-                                : nullptr );
794+                                : nullptr ,
795+         dropDuplicates_);
760796  }
761797  stats_.wlock ()->addRuntimeStat (
762798      BaseHashTable::kBuildWallNanos ,
@@ -879,6 +915,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {
879915  setupTable ();
880916  setupSpiller (spillInput.spillPartition .get ());
881917  stateCleared_ = false ;
918+   numHashInputRows_ = 0 ;
882919
883920  //  Start to process spill input.
884921  processSpillInput ();
@@ -1240,4 +1277,10 @@ void HashBuildSpiller::extractSpill(
12401277        rows.data (), rows.size (), false , false , result->childAt (types.size ()));
12411278  }
12421279}
1280+ 
1281+ bool  HashBuild::abandonBuildNoDupHashEarly (int64_t  numDistinct) const  {
1282+   VELOX_CHECK (dropDuplicates_);
1283+   return  numHashInputRows_ > abandonBuildNoDupHashMinRows_ &&
1284+       numDistinct / numHashInputRows_ >= abandonBuildNoDupHashMinPct_ / 100 ;
1285+ }
12431286} //  namespace facebook::velox::exec
0 commit comments