Skip to content

Commit 2adbde1

Browse files
committed
Push the runtime filter to table am.
1 parent f10e493 commit 2adbde1

15 files changed

+799
-66
lines changed

src/backend/executor/nodeHash.c

+39-8
Original file line numberDiff line numberDiff line change
@@ -2584,6 +2584,7 @@ ExecHashTableExplainEnd(PlanState *planstate, struct StringInfoData *buf)
25842584
Instrumentation *jinstrument = hjstate->js.ps.instrument;
25852585
int total_buckets;
25862586
int i;
2587+
HashState *hashState = (HashState *) innerPlanState(hjstate);
25872588

25882589
if (!hashtable ||
25892590
!hashtable->stats ||
@@ -2598,11 +2599,13 @@ ExecHashTableExplainEnd(PlanState *planstate, struct StringInfoData *buf)
25982599

25992600
if (!hashtable->eagerlyReleased)
26002601
{
2601-
HashState *hashState = (HashState *) innerPlanState(hjstate);
2602-
26032602
/* Report on batch in progress, in case the join is being ended early. */
26042603
ExecHashTableExplainBatchEnd(hashState, hashtable);
26052604
}
2605+
if (gp_enable_runtime_filter_pushdown && hashState->filters)
2606+
{
2607+
ExecRFExplainEnd(hashState, buf);
2608+
}
26062609

26072610
/* Report actual work_mem high water mark. */
26082611
jinstrument->workmemused = Max(jinstrument->workmemused, stats->workmem_max);
@@ -4161,37 +4164,59 @@ PushdownRuntimeFilter(HashState *node)
41614164
scankeys = NIL;
41624165

41634166
attr_filter = lfirst(lc);
4164-
if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty)
4167+
if (!IsA(attr_filter->target, SeqScanState)
4168+
|| attr_filter->empty || attr_filter->hasnulls)
41654169
continue;
41664170

4171+
SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
41674172
/* bloom filter */
41684173
sk = (ScanKey)palloc0(sizeof(ScanKeyData));
41694174
sk->sk_flags = SK_BLOOM_FILTER;
41704175
sk->sk_attno = attr_filter->lattno;
41714176
sk->sk_subtype = INT8OID;
41724177
sk->sk_argument = PointerGetDatum(attr_filter->blm_filter);
4178+
sk->sk_collation = attr_filter->collation;
41734179
scankeys = lappend(scankeys, sk);
41744180

4181+
if (attr_filter->n_distinct > 0)
4182+
{
4183+
int64 range = attr_filter->max - attr_filter->min + 1;
4184+
if ((range / attr_filter->n_distinct) > gp_runtime_filter_selectivity_threshold)
4185+
{
4186+
/* push previous scankeys */
4187+
sss->filters = list_concat(sss->filters, scankeys);
4188+
continue;
4189+
}
4190+
}
41754191
/* range filter */
41764192
sk = (ScanKey)palloc0(sizeof(ScanKeyData));
41774193
sk->sk_flags = 0;
41784194
sk->sk_attno = attr_filter->lattno;
41794195
sk->sk_strategy = BTGreaterEqualStrategyNumber;
4180-
sk->sk_subtype = INT8OID;
4196+
sk->sk_subtype = attr_filter->vartype;
41814197
sk->sk_argument = attr_filter->min;
4198+
sk->sk_collation = attr_filter->collation;
41824199
scankeys = lappend(scankeys, sk);
41834200

41844201
sk = (ScanKey)palloc0(sizeof(ScanKeyData));
41854202
sk->sk_flags = 0;
41864203
sk->sk_attno = attr_filter->lattno;
41874204
sk->sk_strategy = BTLessEqualStrategyNumber;
4188-
sk->sk_subtype = INT8OID;
4205+
sk->sk_subtype = attr_filter->vartype;
41894206
sk->sk_argument = attr_filter->max;
4207+
sk->sk_collation = attr_filter->collation;
41904208
scankeys = lappend(scankeys, sk);
41914209

41924210
/* append new runtime filters to target node */
4193-
SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
4194-
sss->filters = list_concat(sss->filters, scankeys);
4211+
if (sss->ss.ss_currentScanDesc != NULL)
4212+
{
4213+
/* if seqscan is started, we can't pushdown the runtime filter */
4214+
list_free_deep(scankeys);
4215+
}
4216+
else
4217+
{
4218+
sss->filters = list_concat(sss->filters, scankeys);
4219+
}
41954220
}
41964221
}
41974222

@@ -4206,10 +4231,15 @@ AddTupleValuesIntoRF(HashState *node, TupleTableSlot *slot)
42064231
foreach (lc, node->filters)
42074232
{
42084233
attr_filter = (AttrFilter *) lfirst(lc);
4234+
if (attr_filter->hasnulls)
4235+
continue;
42094236

42104237
val = slot_getattr(slot, attr_filter->rattno, &isnull);
42114238
if (isnull)
4239+
{
4240+
attr_filter->hasnulls = true;
42124241
continue;
4242+
}
42134243

42144244
attr_filter->empty = false;
42154245

@@ -4258,6 +4288,7 @@ ResetRuntimeFilter(HashState *node)
42584288
{
42594289
attr_filter = lfirst(lc);
42604290
attr_filter->empty = true;
4291+
attr_filter->hasnulls = false;
42614292

42624293
if (IsA(attr_filter->target, SeqScanState))
42634294
{
@@ -4274,7 +4305,7 @@ ResetRuntimeFilter(HashState *node)
42744305

42754306
attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows,
42764307
work_mem,
4277-
random());
4308+
gp_session_id);
42784309

42794310
StaticAssertDecl(sizeof(LONG_MAX) == sizeof(Datum), "sizeof(LONG_MAX) should be equal to sizeof(Datum)");
42804311
StaticAssertDecl(sizeof(LONG_MIN) == sizeof(Datum), "sizeof(LONG_MIN) should be equal to sizeof(Datum)");

src/backend/executor/nodeHashjoin.c

+32-8
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@
110110

111111
#include "access/htup_details.h"
112112
#include "access/parallel.h"
113+
#include "catalog/pg_statistic.h"
114+
#include "catalog/pg_namespace.h"
113115
#include "executor/executor.h"
114116
#include "executor/hashjoin.h"
115117
#include "executor/instrument.h" /* Instrumentation */
@@ -118,9 +120,12 @@
118120
#include "executor/nodeRuntimeFilter.h"
119121
#include "miscadmin.h"
120122
#include "pgstat.h"
123+
#include "utils/datum.h"
121124
#include "utils/guc.h"
122125
#include "utils/fmgroids.h"
126+
#include "utils/lsyscache.h"
123127
#include "utils/memutils.h"
128+
#include "utils/rel.h"
124129
#include "utils/sharedtuplestore.h"
125130

126131
#include "cdb/cdbvars.h"
@@ -168,10 +173,10 @@ static bool IsEqualOp(Expr *expr);
168173
static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno);
169174
static bool CheckTargetNode(PlanState *node,
170175
AttrNumber attno,
171-
AttrNumber *lattno);
176+
AttrNumber *lattno, Oid *collation, Oid *var_type);
172177
static List *FindTargetNodes(HashJoinState *hjstate,
173178
AttrNumber attno,
174-
AttrNumber *lattno);
179+
AttrNumber *lattno, Oid *collation, Oid *var_type);
175180
static AttrFilter *CreateAttrFilter(PlanState *target,
176181
AttrNumber lattno,
177182
AttrNumber rattno,
@@ -2192,6 +2197,8 @@ CreateRuntimeFilter(HashJoinState* hjstate)
21922197
AttrFilter *attr_filter;
21932198
ListCell *lc;
21942199
List *targets;
2200+
Oid var_type;
2201+
Oid collation;
21952202

21962203
/*
21972204
* A build-side Bloom filter tells us if a row is definitely not in the build
@@ -2232,7 +2239,7 @@ CreateRuntimeFilter(HashJoinState* hjstate)
22322239
if (lattno < 1 || rattno < 1)
22332240
continue;
22342241

2235-
targets = FindTargetNodes(hjstate, lattno, &lattno);
2242+
targets = FindTargetNodes(hjstate, lattno, &lattno, &collation, &var_type);
22362243
if (lattno == -1 || targets == NULL)
22372244
continue;
22382245

@@ -2243,6 +2250,8 @@ CreateRuntimeFilter(HashJoinState* hjstate)
22432250

22442251
attr_filter = CreateAttrFilter(target, lattno, rattno,
22452252
hstate->ps.plan->plan_rows);
2253+
attr_filter->vartype = var_type;
2254+
attr_filter->collation = collation;
22462255
if (attr_filter->blm_filter)
22472256
hstate->filters = lappend(hstate->filters, attr_filter);
22482257
else
@@ -2329,7 +2338,7 @@ CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno)
23292338
}
23302339

23312340
static bool
2332-
CheckTargetNode(PlanState *node, AttrNumber attno, AttrNumber *lattno)
2341+
CheckTargetNode(PlanState *node, AttrNumber attno, AttrNumber *lattno, Oid *collation, Oid *var_type)
23332342
{
23342343
Var *var;
23352344
TargetEntry *te;
@@ -2348,6 +2357,8 @@ CheckTargetNode(PlanState *node, AttrNumber attno, AttrNumber *lattno)
23482357
return false;
23492358

23502359
*lattno = var->varattno;
2360+
*collation = var->varcollid;
2361+
*var_type = var->vartype;
23512362

23522363
return true;
23532364
}
@@ -2360,7 +2371,7 @@ CheckTargetNode(PlanState *node, AttrNumber attno, AttrNumber *lattno)
23602371
* SeqScan <- target
23612372
*/
23622373
static List *
2363-
FindTargetNodes(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno)
2374+
FindTargetNodes(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno, Oid *collation, Oid *var_type)
23642375
{
23652376
Var *var;
23662377
PlanState *child, *parent;
@@ -2386,7 +2397,7 @@ FindTargetNodes(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno)
23862397
* result
23872398
* seqscan
23882399
*/
2389-
if (!CheckTargetNode(child, attno, lattno))
2400+
if (!CheckTargetNode(child, attno, lattno, collation, var_type))
23902401
return NULL;
23912402

23922403
targetNodes = lappend(targetNodes, child);
@@ -2404,7 +2415,7 @@ FindTargetNodes(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno)
24042415
for (int i = 0; i < as->as_nplans; i++)
24052416
{
24062417
child = as->appendplans[i];
2407-
if (!CheckTargetNode(child, attno, lattno))
2418+
if (!CheckTargetNode(child, attno, lattno, collation, var_type))
24082419
return NULL;
24092420

24102421
targetNodes = lappend(targetNodes, child);
@@ -2452,12 +2463,25 @@ CreateAttrFilter(PlanState *target, AttrNumber lattno, AttrNumber rattno,
24522463
{
24532464
AttrFilter *attr_filter = palloc0(sizeof(AttrFilter));
24542465
attr_filter->empty = true;
2466+
attr_filter->hasnulls = false;
24552467
attr_filter->target = target;
24562468

24572469
attr_filter->lattno = lattno;
24582470
attr_filter->rattno = rattno;
2471+
attr_filter->n_distinct = 0.0;
24592472

2460-
attr_filter->blm_filter = bloom_create_aggresive(plan_rows, work_mem, random());
2473+
attr_filter->blm_filter = bloom_create_aggresive(plan_rows, work_mem, gp_session_id);
2474+
2475+
if (target && IsA(target, SeqScanState))
2476+
{
2477+
HeapTuple statstuple;
2478+
SeqScanState *scan = (SeqScanState *)target;
2479+
statstuple = get_att_stats(RelationGetRelid(scan->ss.ss_currentRelation), lattno);
2480+
if (HeapTupleIsValid(statstuple))
2481+
{
2482+
attr_filter->n_distinct = ((Form_pg_statistic) GETSTRUCT(statstuple))->stadistinct;
2483+
}
2484+
}
24612485

24622486
StaticAssertDecl(sizeof(LONG_MAX) == sizeof(Datum), "sizeof(LONG_MAX) should be equal to sizeof(Datum)");
24632487
StaticAssertDecl(sizeof(LONG_MIN) == sizeof(Datum), "sizeof(LONG_MIN) should be equal to sizeof(Datum)");

src/backend/executor/nodeRuntimeFilter.c

+29-1
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,18 @@
2828

2929
#include "postgres.h"
3030

31+
#include "access/tableam.h"
3132
#include "catalog/pg_type.h"
33+
#include "catalog/pg_statistic.h"
34+
#include "catalog/pg_namespace.h"
3235
#include "executor/executor.h"
3336
#include "executor/hashjoin.h"
3437
#include "executor/nodeRuntimeFilter.h"
3538
#include "lib/bloomfilter.h"
3639
#include "miscadmin.h"
3740
#include "nodes/nodeFuncs.h"
3841
#include "nodes/pg_list.h"
42+
#include "utils/datum.h"
3943
#include "utils/lsyscache.h"
4044

4145
#include "cdb/cdbvars.h"
@@ -325,4 +329,28 @@ RFFillTupleValues(RuntimeFilterState *rfstate, List *values)
325329
rfstate->value_buf[idx] = *dp;
326330
idx++;
327331
}
328-
}
332+
}
333+
334+
void
335+
ExecRFExplainEnd(HashState *hashState, struct StringInfoData *buf)
336+
{
337+
ListCell *lc;
338+
AttrFilter *attr_filter;
339+
SeqScanState *sss;
340+
341+
if (!hashState->filters)
342+
return;
343+
344+
foreach (lc, hashState->filters)
345+
{
346+
attr_filter = lfirst(lc);
347+
if (attr_filter->empty || attr_filter->hasnulls)
348+
continue;
349+
350+
sss = castNode(SeqScanState, attr_filter->target);
351+
appendStringInfo(buf, "RF: %s attrno: %d, range[%ld, %ld], n_distinct: %.2f\n",
352+
RelationGetRelationName(sss->ss.ss_currentRelation),
353+
attr_filter->lattno, (int64_t) attr_filter->min,
354+
(int64_t) attr_filter->max, attr_filter->n_distinct);
355+
}
356+
}

0 commit comments

Comments
 (0)