Skip to content

Cherry-pick: Add explain analyze detailed info of hash agg (#15917) #1056

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/backend/commands/explain.c
Original file line number Diff line number Diff line change
Expand Up @@ -4266,7 +4266,7 @@ show_hashagg_info(AggState *aggstate, ExplainState *es)
}

/* Display stats for each parallel worker */
if (es->analyze && aggstate->shared_info != NULL)
if (aggstate->shared_info != NULL)
{
for (int n = 0; n < aggstate->shared_info->num_workers; n++)
{
Expand Down
145 changes: 145 additions & 0 deletions src/backend/executor/nodeAgg.c
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ static void build_pertrans_for_aggref(AggStatePerTrans pertrans,

static void ExecEagerFreeAgg(AggState *node);

void agg_hash_explain_extra_message(AggState *aggstate);

/*
* Select the current grouping set; affects current_set and
* curaggcontext.
Expand Down Expand Up @@ -1540,6 +1542,16 @@ build_hash_tables(AggState *aggstate)
memory);

build_hash_table(aggstate, setno, nbuckets);

/* initialize some statistic info of hash table */
perhash->num_output_groups = 0;
perhash->num_spill_parts = 0;
perhash->num_expansions = 0;
perhash->bucket_total = 0;
perhash->bucket_used = 0;
perhash->chain_count = 0;
perhash->chain_length_total = 0;
perhash->chain_length_max = 0;
}

aggstate->hash_ngroups_current = 0;
Expand Down Expand Up @@ -1966,6 +1978,7 @@ hash_agg_enter_spill_mode(AggState *aggstate)
hashagg_spill_init(aggstate, spill, aggstate->hash_tapeinfo, 0,
perhash->aggnode->numGroups,
aggstate->hashentrysize);
perhash->num_spill_parts += spill->npartitions;
}

if (aggstate->ss.ps.instrument)
Expand Down Expand Up @@ -2019,6 +2032,12 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
}

/* update hashentrysize estimate based on contents */
/*
* Greenplum doesn't use hashentrysize in the instrumentation, it will
* calculate hash table chain length to get an accurate number.
*
* See the following code to collect hash table statistic info.
*/
if (aggstate->hash_ngroups_current > 0)
{
aggstate->hashentrysize =
Expand All @@ -2031,6 +2050,47 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
Instrumentation *instrument = aggstate->ss.ps.instrument;

instrument->workmemused = aggstate->hash_mem_peak;

/*
* workmemwanted to avoid scratch i/o, that how much memory is needed
* if we want to load into the hashtable at once:
*
* 1. add meta_mem only when from_tape is false, because when we are
* reading from tape/spilled file, we can reuse the existing hash
* table's meta.
* 2. add hash_mem every time.
* 3. don't add buffer_mem since it's unnecessary when we can load into
* into the memory at once.
*/
if (!from_tape)
instrument->workmemwanted += meta_mem;
instrument->workmemwanted += hashkey_mem;

/* Scan all perhashs and collect hash table statistic info */
for (int setno = 0; setno < aggstate->num_hashes; setno++)
{
AggStatePerHash perhash = &aggstate->perhash[setno];
tuplehash_hash *hashtab = perhash->hashtable->hashtab;

Assert(hashtab);

perhash->num_expansions += hashtab->num_expansions;
perhash->bucket_total += hashtab->size;
perhash->bucket_used += hashtab->members;
if (hashtab->members > 0)
{
uint32 perht_chain_length_total = 0;
uint32 perht_chain_count = 0;

/* collect statistic info of chain length per hash table */
tuplehash_coll_stat(hashtab,
&(perhash->chain_length_max),
&perht_chain_length_total,
&perht_chain_count);
perhash->chain_count += perht_chain_count;
perhash->chain_length_total += perht_chain_length_total;
}
}
}
}

Expand Down Expand Up @@ -2225,6 +2285,7 @@ lookup_hash_entries(AggState *aggstate)
hashagg_spill_init(aggstate, spill, aggstate->hash_tapeinfo, 0,
perhash->aggnode->numGroups,
aggstate->hashentrysize);
perhash->num_spill_parts += spill->npartitions;

hashagg_spill_tuple(aggstate, spill, slot, hash);
pergroup[setno] = NULL;
Expand Down Expand Up @@ -2277,6 +2338,13 @@ ExecAgg(PlanState *pstate)
return result;
}

/* Save statistics into the cdbexplainbuf for EXPLAIN ANALYZE */
if (node->ss.ps.instrument &&
(node->ss.ps.instrument)->need_cdb &&
(node->phase->aggstrategy == AGG_HASHED ||
node->phase->aggstrategy == AGG_MIXED))
agg_hash_explain_extra_message(node);

return NULL;
}

Expand Down Expand Up @@ -2807,6 +2875,7 @@ agg_refill_hash_table(AggState *aggstate)
spill_initialized = true;
hashagg_spill_init(aggstate, &spill, tapeinfo, batch->used_bits,
batch->input_card, aggstate->hashentrysize);
aggstate->perhash[aggstate->current_set].num_spill_parts += spill.npartitions;
}
/* no memory for a new group, spill */
hashagg_spill_tuple(aggstate, &spill, spillslot, hash);
Expand Down Expand Up @@ -2981,6 +3050,8 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate)
}
}

perhash->num_output_groups++;

/*
* Clear the per-output-tuple context for each group
*
Expand Down Expand Up @@ -5139,3 +5210,77 @@ ReuseHashTable(AggState *node)
!node->streaming &&
!bms_overlap(node->ss.ps.chgParam, aggnode->aggParams));
}

/*
* Save statistics into the cdbexplainbuf for EXPLAIN ANALYZE
*/
void
agg_hash_explain_extra_message(AggState *aggstate)
{
/*
* Check cdbexplain_depositStatsToNode(), Greenplum only saves extra
* message text for the most interesting winning qExecs.
*/
StringInfo hbuf = aggstate->ss.ps.cdbexplainbuf;
uint64 sum_num_expansions = 0;
uint64 sum_output_groups = 0;
uint64 sum_spill_parts = 0;
uint64 sum_chain_length_total = 0;
uint64 sum_chain_count = 0;
uint32 chain_length_max = 0;
uint64 sum_bucket_used = 0;
uint64 sum_bucket_total = 0;

Assert(hbuf);

appendStringInfo(hbuf, "hash table(s): %d", aggstate->num_hashes);

/* Scan all perhashs and collect statistic info */
for (int setno = 0; setno < aggstate->num_hashes; setno++)
{
AggStatePerHash perhash = &aggstate->perhash[setno];

/* spill statistic info */
if (aggstate->hash_ever_spilled)
{
sum_output_groups += perhash->num_output_groups;
sum_spill_parts += perhash->num_spill_parts;
}

/* inner hash table statistic info */
if (perhash->chain_count > 0)
{
sum_chain_length_total += perhash->chain_length_total;
sum_chain_count += perhash->chain_count;
if (perhash->chain_length_max > chain_length_max)
chain_length_max = perhash->chain_length_max;
sum_bucket_used = perhash->bucket_used;
sum_bucket_total = perhash->bucket_total;
sum_num_expansions += perhash->num_expansions;
}
}

if (aggstate->hash_ever_spilled)
{
appendStringInfo(hbuf,
"; " UINT64_FORMAT " groups total in %d batches, " UINT64_FORMAT
" spill partitions; disk usage: " INT64_FORMAT "KB",
sum_output_groups,
aggstate->hash_batches_used,
sum_spill_parts,
aggstate->hash_disk_used);
}

if (sum_chain_count > 0)
{
appendStringInfo(hbuf,
"; chain length %.1f avg, %d max;"
" using " INT64_FORMAT " of " INT64_FORMAT " buckets;"
" total " INT64_FORMAT " expansions.\n",
(double)sum_chain_length_total / sum_chain_count,
chain_length_max,
sum_bucket_used,
sum_bucket_total,
sum_num_expansions);
}
}
23 changes: 23 additions & 0 deletions src/include/executor/nodeAgg.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,29 @@ typedef struct AggStatePerHashData
AttrNumber *hashGrpColIdxInput; /* hash col indices in input slot */
AttrNumber *hashGrpColIdxHash; /* indices in hash table tuples */
Agg *aggnode; /* original Agg node, for numGroups etc. */

/*
* Some statistic info of hash table, used for EXPLAIN ANALYZE.
* Note that they are accumulated info and will not be reset even
* after the hash table is reset.
*/

/* number of groups/entries output by the iterator */
uint64 num_output_groups;
/* number of spilled partitions */
uint64 num_spill_parts;
/* number of hash table expansions */
uint32 num_expansions;
/* total number of buckets */
uint64 bucket_total;
/* number of used buckets */
uint64 bucket_used;
/* number of all chains */
uint64 chain_count;
/* total length of all chains */
uint64 chain_length_total;
/* max chain length */
uint32 chain_length_max;
} AggStatePerHashData;


Expand Down
81 changes: 81 additions & 0 deletions src/include/lib/simplehash.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
#define SH_ALLOCATE SH_MAKE_NAME(allocate)
#define SH_FREE SH_MAKE_NAME(free)
#define SH_STAT SH_MAKE_NAME(stat)
#define SH_COLL_STAT SH_MAKE_NAME(coll_stat)

/* internal helper functions (no externally visible prototypes) */
#define SH_COMPUTE_PARAMETERS SH_MAKE_NAME(compute_parameters)
Expand Down Expand Up @@ -169,6 +170,14 @@ typedef struct SH_TYPE

/* user defined data, useful for callbacks */
void *private_data;

/*
* number of times hash table is expanded
*
* Since the max size of hash table is UINT32_MAX, uint32 is good
* enough for the number of expanded times.
*/
uint32 num_expansions;
} SH_TYPE;

typedef enum SH_STATUS
Expand Down Expand Up @@ -244,6 +253,12 @@ SH_SCOPE SH_ELEMENT_TYPE *SH_ITERATE(SH_TYPE * tb, SH_ITERATOR * iter);
/* void <prefix>_stat(<prefix>_hash *tb */
SH_SCOPE void SH_STAT(SH_TYPE * tb);

SH_SCOPE void
SH_COLL_STAT(SH_TYPE * tb,
uint32 * max_chain_length,
uint32 * total_chain_length,
uint32 * chain_count);

#endif /* SH_DECLARE */


Expand Down Expand Up @@ -450,6 +465,7 @@ SH_CREATE(MemoryContext ctx, uint32 nelements, void *private_data)
SH_COMPUTE_PARAMETERS(tb, size);

tb->data = SH_ALLOCATE(tb, sizeof(SH_ELEMENT_TYPE) * tb->size);
tb->num_expansions = 0;

return tb;
}
Expand All @@ -468,6 +484,7 @@ SH_RESET(SH_TYPE * tb)
{
memset(tb->data, 0, sizeof(SH_ELEMENT_TYPE) * tb->size);
tb->members = 0;
tb->num_expansions = 0;
}

/*
Expand Down Expand Up @@ -580,6 +597,7 @@ SH_GROW(SH_TYPE * tb, uint64 newsize)
}
}

(tb->num_expansions)++;
SH_FREE(tb, olddata);
}

Expand Down Expand Up @@ -1120,6 +1138,69 @@ SH_STAT(SH_TYPE * tb)
total_collisions, max_collisions, avg_collisions);
}

/*
* Greenplum specific
*
* Collect some statistics about the state of the hashtable. Major code was
* copied from SH_STAT() with some modifications to keep consistent with GPDB6.
*/
SH_SCOPE void
SH_COLL_STAT(SH_TYPE * tb,
uint32 * max_chain_length,
uint32 * total_chain_length,
uint32 * chain_count)
{
*total_chain_length = 0;
*chain_count = 0;
uint32 last_dist = 0;

for (int i = 0; i < tb->size; i++)
{
uint32 hash;
uint32 optimal;
uint32 dist;
SH_ELEMENT_TYPE *elem;

elem = &tb->data[i];

if (elem->status != SH_STATUS_IN_USE)
continue;

hash = SH_ENTRY_HASH(tb, elem);
optimal = SH_INITIAL_BUCKET(tb, hash);
dist = SH_DISTANCE_FROM_OPTIMAL(tb, optimal, i);

/*
* Different from SH_STAT(), always calculate chain length from 1 but
* not 0, e.g. when there is only one element in bucket, the length
* is 1.
*/
dist++;

/*
* In same chain, dist must be always increasing. If dist < last_dist,
* we must hit a new chain; take the length of old chain into account.
*/
if (dist < last_dist)
{
if (last_dist > *max_chain_length)
*max_chain_length = last_dist;
*total_chain_length += last_dist;
(*chain_count)++;
}
last_dist = dist;
}

/* Count the last chain. */
if (last_dist != 0)
{
if (last_dist > *max_chain_length)
*max_chain_length = last_dist;
*total_chain_length += last_dist;
(*chain_count)++;
}
}

#endif /* SH_DEFINE */


Expand Down
Loading
Loading