@@ -201,12 +201,12 @@ void HashAggregation::addInput(RowVectorPtr input) {
201201
202202 const bool abandonPartialEarly = isPartialOutput_ && !isGlobal_ &&
203203 abandonPartialAggregationEarly (groupingSet_->numDistinct ());
204- if (isDistinct_) {
204+ if (isDistinct_ && !distinctAggregationSpillEnabled_ ) {
205205 newDistincts_ = !groupingSet_->hashLookup ().newGroups .empty ();
206206
207207 if (newDistincts_) {
208208 // Save input to use for output in getOutput().
209- // input_ = input;
209+ input_ = input;
210210 } else if (abandonPartialEarly) {
211211 // If no new distinct groups (meaning we don't have anything to output)
212212 // and we are abandoning the partial aggregation, then we need to ensure
@@ -352,40 +352,36 @@ RowVectorPtr HashAggregation::getOutput() {
352352 // - partial aggregation reached memory limit;
353353 // - distinct aggregation has new keys;
354354 // - running in partial streaming mode and have some output ready.
355- if (!noMoreInput_ && !partialFull_ &&
355+ if (!noMoreInput_ && !partialFull_ && !newDistincts_ &&
356356 !groupingSet_->hasOutput ()) {
357357 input_ = nullptr ;
358358 return nullptr ;
359359 }
360360
361- #if 0
362- if (isDistinct_) {
361+ if (isDistinct_ && !distinctAggregationSpillEnabled_) {
363362 if (!newDistincts_) {
364363 if (noMoreInput_) {
365364 finished_ = true ;
366365 }
367366 return nullptr ;
368367 }
369368
370- if (!distinctAggregationSpillEnabled_) {
371- auto lookup = groupingSet_->hashLookup();
372- auto size = lookup.newGroups.size();
373- BufferPtr indices = allocateIndices(size, operatorCtx_->pool());
374- auto indicesPtr = indices->asMutable<vector_size_t>();
375- std::copy(lookup.newGroups.begin(), lookup.newGroups.end(), indicesPtr);
376- newDistincts_ = false;
377- auto output = fillOutput(size, indices);
378- numOutputRows_ += size;
379-
380- // Drop reference to input_ to make it singly-referenced at the producer
381- // and allow for memory reuse.
382- input_ = nullptr;
383-
384- resetPartialOutputIfNeed();
385- return output;
386- }
369+ auto lookup = groupingSet_->hashLookup ();
370+ auto size = lookup.newGroups .size ();
371+ BufferPtr indices = allocateIndices (size, operatorCtx_->pool ());
372+ auto indicesPtr = indices->asMutable <vector_size_t >();
373+ std::copy (lookup.newGroups .begin (), lookup.newGroups .end (), indicesPtr);
374+ newDistincts_ = false ;
375+ auto output = fillOutput (size, indices);
376+ numOutputRows_ += size;
377+
378+ // Drop reference to input_ to make it singly-referenced at the producer
379+ // and allow for memory reuse.
380+ input_ = nullptr ;
381+
382+ resetPartialOutputIfNeed ();
383+ return output;
387384 }
388- #endif
389385
390386 const auto batchSize =
391387 isGlobal_ ? 1 : outputBatchRows (groupingSet_->estimateRowSize ());
0 commit comments