Skip to content

Commit ab47535

Browse files
Initial parallel sequence and parallel cte consumer and producer in ORCA.
1 parent 9932028 commit ab47535

35 files changed

+4034
-5
lines changed

src/backend/gpopt/translate/CTranslatorDXLToPlStmt.cpp

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ extern "C" {
6464
#include "naucrates/dxl/operators/CDXLPhysicalBitmapTableScan.h"
6565
#include "naucrates/dxl/operators/CDXLPhysicalCTAS.h"
6666
#include "naucrates/dxl/operators/CDXLPhysicalCTEConsumer.h"
67+
#include "naucrates/dxl/operators/CDXLPhysicalParallelCTEConsumer.h"
6768
#include "naucrates/dxl/operators/CDXLPhysicalCTEProducer.h"
69+
#include "naucrates/dxl/operators/CDXLPhysicalParallelCTEProducer.h"
70+
#include "naucrates/dxl/operators/CDXLPhysicalParallelSequence.h"
6871
#include "naucrates/dxl/operators/CDXLPhysicalDynamicBitmapTableScan.h"
6972
#include "naucrates/dxl/operators/CDXLPhysicalDynamicForeignScan.h"
7073
#include "naucrates/dxl/operators/CDXLPhysicalDynamicIndexOnlyScan.h"
@@ -461,6 +464,12 @@ CTranslatorDXLToPlStmt::TranslateDXLOperatorToPlan(
461464
ctxt_translation_prev_siblings);
462465
break;
463466
}
467+
case EdxlopPhysicalParallelSequence:
468+
{
469+
plan = TranslateDXLParallelSequence(dxlnode, output_context,
470+
ctxt_translation_prev_siblings);
471+
break;
472+
}
464473
case EdxlopPhysicalDynamicTableScan:
465474
{
466475
plan = TranslateDXLDynTblScan(dxlnode, output_context,
@@ -515,12 +524,24 @@ CTranslatorDXLToPlStmt::TranslateDXLOperatorToPlan(
515524
dxlnode, output_context, ctxt_translation_prev_siblings);
516525
break;
517526
}
527+
case EdxlopPhysicalParallelCTEProducer:
528+
{
529+
plan = TranslateDXLParallelCTEProducerToParallelSharedScan(
530+
dxlnode, output_context, ctxt_translation_prev_siblings);
531+
break;
532+
}
518533
case EdxlopPhysicalCTEConsumer:
519534
{
520535
plan = TranslateDXLCTEConsumerToSharedScan(
521536
dxlnode, output_context, ctxt_translation_prev_siblings);
522537
break;
523538
}
539+
case EdxlopPhysicalParallelCTEConsumer:
540+
{
541+
plan = TranslateDXLParallelCTEConsumerToParallelSharedScan(
542+
dxlnode, output_context, ctxt_translation_prev_siblings);
543+
break;
544+
}
524545
case EdxlopPhysicalBitmapTableScan:
525546
case EdxlopPhysicalDynamicBitmapTableScan:
526547
{
@@ -4772,6 +4793,71 @@ CTranslatorDXLToPlStmt::TranslateDXLCTEProducerToSharedScan(
47724793
return (Plan *) shared_input_scan;
47734794
}
47744795

4796+
//---------------------------------------------------------------------------
4797+
// @function:
4798+
// CTranslatorDXLToPlStmt::TranslateDXLParallelCTEProducerToParallelSharedScan
4799+
//
4800+
// @doc:
4801+
// Translate DXL CTE Producer node into GPDB parallel share input scan plan node
4802+
//
4803+
//---------------------------------------------------------------------------
4804+
Plan *
4805+
CTranslatorDXLToPlStmt::TranslateDXLParallelCTEProducerToParallelSharedScan(
4806+
const CDXLNode *cte_producer_dxlnode, CDXLTranslateContext *output_context,
4807+
CDXLTranslationContextArray *ctxt_translation_prev_siblings)
4808+
{
4809+
CDXLPhysicalParallelCTEProducer *cte_prod_dxlop =
4810+
CDXLPhysicalParallelCTEProducer::Cast(cte_producer_dxlnode->GetOperator());
4811+
ULONG cte_id = cte_prod_dxlop->Id();
4812+
ULONG parallel_workers = cte_prod_dxlop->UlParallelWorkers();
4813+
4814+
// create the Share Input Scan representing the CTE Producer
4815+
ShareInputScan *shared_input_scan = MakeNode(ShareInputScan);
4816+
shared_input_scan->share_id = cte_id;
4817+
shared_input_scan->discard_output = true;
4818+
4819+
Plan *plan = &(shared_input_scan->scan.plan);
4820+
plan->plan_node_id = m_dxl_to_plstmt_context->GetNextPlanId();
4821+
// Set parallel execution flags
4822+
plan->parallel_aware = true;
4823+
plan->parallel_safe = true;
4824+
plan->parallel = (int) parallel_workers;
4825+
4826+
m_dxl_to_plstmt_context->RegisterCTEProducerInfo(cte_id,
4827+
cte_prod_dxlop->GetOutputColIdxMap(), shared_input_scan);
4828+
4829+
// translate cost of the producer
4830+
TranslatePlanCosts(cte_producer_dxlnode, plan);
4831+
4832+
// translate child plan
4833+
CDXLNode *project_list_dxlnode = (*cte_producer_dxlnode)[0];
4834+
CDXLNode *child_dxlnode = (*cte_producer_dxlnode)[1];
4835+
4836+
CDXLTranslateContext child_context(m_mp, false,
4837+
output_context->GetColIdToParamIdMap());
4838+
Plan *child_plan = TranslateDXLOperatorToPlan(
4839+
child_dxlnode, &child_context, ctxt_translation_prev_siblings);
4840+
GPOS_ASSERT(nullptr != child_plan && "child plan cannot be NULL");
4841+
4842+
CDXLTranslationContextArray *child_contexts =
4843+
GPOS_NEW(m_mp) CDXLTranslationContextArray(m_mp);
4844+
child_contexts->Append(&child_context);
4845+
// translate proj list
4846+
plan->targetlist =
4847+
TranslateDXLProjList(project_list_dxlnode,
4848+
nullptr, // base table translation context
4849+
child_contexts, output_context);
4850+
4851+
plan->lefttree = child_plan;
4852+
plan->qual = NIL;
4853+
SetParamIds(plan);
4854+
4855+
// cleanup
4856+
child_contexts->Release();
4857+
4858+
return (Plan *) shared_input_scan;
4859+
}
4860+
47754861
//---------------------------------------------------------------------------
47764862
// @function:
47774863
// CTranslatorDXLToPlStmt::TranslateDXLCTEConsumerToSharedScan
@@ -4874,6 +4960,114 @@ CTranslatorDXLToPlStmt::TranslateDXLCTEConsumerToSharedScan(
48744960
return (Plan *) share_input_scan_cte_consumer;
48754961
}
48764962

4963+
//---------------------------------------------------------------------------
4964+
// @function:
4965+
// CTranslatorDXLToPlStmt::TranslateDXLParallelCTEConsumerToParallelSharedScan
4966+
//
4967+
// @doc:
4968+
// Translate DXL CTE Consumer node into GPDB parallel share input scan plan node
4969+
//
4970+
//---------------------------------------------------------------------------
4971+
Plan *
4972+
CTranslatorDXLToPlStmt::TranslateDXLParallelCTEConsumerToParallelSharedScan(
4973+
const CDXLNode *cte_consumer_dxlnode, CDXLTranslateContext *output_context,
4974+
CDXLTranslationContextArray * /*ctxt_translation_prev_siblings*/)
4975+
{
4976+
CDXLPhysicalParallelCTEConsumer *cte_consumer_dxlop =
4977+
CDXLPhysicalParallelCTEConsumer::Cast(cte_consumer_dxlnode->GetOperator());
4978+
ULONG cte_id = cte_consumer_dxlop->Id();
4979+
ULongPtrArray *output_colidx_map = cte_consumer_dxlop->GetOutputColIdxMap();
4980+
ULONG parallel_workers = cte_consumer_dxlop->UlParallelWorkers();
4981+
4982+
// get the producer idx map
4983+
ULongPtrArray *producer_colidx_map;
4984+
ShareInputScan *share_input_scan_cte_producer;
4985+
4986+
std::tie(producer_colidx_map, share_input_scan_cte_producer)
4987+
= m_dxl_to_plstmt_context->GetCTEProducerInfo(cte_id);
4988+
4989+
// init the consumer plan
4990+
ShareInputScan *share_input_scan_cte_consumer = MakeNode(ShareInputScan);
4991+
share_input_scan_cte_consumer->share_id = cte_id;
4992+
share_input_scan_cte_consumer->discard_output = false;
4993+
4994+
Plan *plan = &(share_input_scan_cte_consumer->scan.plan);
4995+
plan->plan_node_id = m_dxl_to_plstmt_context->GetNextPlanId();
4996+
4997+
// Set parallel execution flags
4998+
plan->parallel_aware = true;
4999+
plan->parallel_safe = true;
5000+
plan->parallel = (int) parallel_workers;
5001+
5002+
// translate operator costs
5003+
TranslatePlanCosts(cte_consumer_dxlnode, plan);
5004+
5005+
#ifdef GPOS_DEBUG
5006+
ULongPtrArray *output_colids_array =
5007+
cte_consumer_dxlop->GetOutputColIdsArray();
5008+
#endif
5009+
5010+
// generate the target list of the CTE Consumer
5011+
plan->targetlist = NIL;
5012+
CDXLNode *project_list_dxlnode = (*cte_consumer_dxlnode)[0];
5013+
const ULONG num_of_proj_list_elem = project_list_dxlnode->Arity();
5014+
GPOS_ASSERT(num_of_proj_list_elem == output_colids_array->Size());
5015+
for (ULONG ul = 0; ul < num_of_proj_list_elem; ul++)
5016+
{
5017+
AttrNumber varattno = (AttrNumber)ul + 1;
5018+
if (output_colidx_map) {
5019+
ULONG remapping_idx;
5020+
remapping_idx = *(*output_colidx_map)[ul];
5021+
if (producer_colidx_map) {
5022+
remapping_idx = *(*producer_colidx_map)[remapping_idx];
5023+
}
5024+
GPOS_ASSERT(remapping_idx != gpos::ulong_max);
5025+
varattno = (AttrNumber)remapping_idx + 1;
5026+
}
5027+
5028+
CDXLNode *proj_elem_dxlnode = (*project_list_dxlnode)[ul];
5029+
CDXLScalarProjElem *sc_proj_elem_dxlop =
5030+
CDXLScalarProjElem::Cast(proj_elem_dxlnode->GetOperator());
5031+
ULONG colid = sc_proj_elem_dxlop->Id();
5032+
GPOS_ASSERT(colid == *(*output_colids_array)[ul]);
5033+
5034+
CDXLNode *sc_ident_dxlnode = (*proj_elem_dxlnode)[0];
5035+
CDXLScalarIdent *sc_ident_dxlop =
5036+
CDXLScalarIdent::Cast(sc_ident_dxlnode->GetOperator());
5037+
OID oid_type = CMDIdGPDB::CastMdid(sc_ident_dxlop->MdidType())->Oid();
5038+
5039+
Var *var =
5040+
gpdb::MakeVar(OUTER_VAR, varattno, oid_type,
5041+
sc_ident_dxlop->TypeModifier(), 0 /* varlevelsup */);
5042+
5043+
CHAR *resname = CTranslatorUtils::CreateMultiByteCharStringFromWCString(
5044+
sc_proj_elem_dxlop->GetMdNameAlias()->GetMDName()->GetBuffer());
5045+
TargetEntry *target_entry = gpdb::MakeTargetEntry(
5046+
(Expr *) var, (AttrNumber)(ul + 1), resname, false /* resjunk */);
5047+
plan->targetlist = gpdb::LAppend(plan->targetlist, target_entry);
5048+
5049+
output_context->InsertMapping(colid, target_entry);
5050+
}
5051+
5052+
plan->qual = nullptr;
5053+
5054+
SetParamIds(plan);
5055+
5056+
// DON'T REMOVE, if current consumer need projection, then we can direct add it.
5057+
// we still keep the path of projection in consumer
5058+
5059+
// Plan *producer_plan = &(share_input_scan_cte_producer->scan.plan);
5060+
// if (output_colidx_map != nullptr) {
5061+
// share_input_scan_cte_consumer->need_projection = true;
5062+
// share_input_scan_cte_consumer->producer_targetlist = gpdb::ListCopy(producer_plan->targetlist);
5063+
// if (!share_input_scan_cte_consumer->producer_targetlist) {
5064+
// share_input_scan_cte_consumer->need_projection = false;
5065+
// }
5066+
// }
5067+
5068+
return (Plan *) share_input_scan_cte_consumer;
5069+
}
5070+
48775071
//---------------------------------------------------------------------------
48785072
// @function:
48795073
// CTranslatorDXLToPlStmt::TranslateDXLSequence
@@ -4931,6 +5125,71 @@ CTranslatorDXLToPlStmt::TranslateDXLSequence(
49315125
return (Plan *) psequence;
49325126
}
49335127

5128+
//---------------------------------------------------------------------------
5129+
// @function:
5130+
// CTranslatorDXLToPlStmt::TranslateDXLParallelSequence
5131+
//
5132+
// @doc:
5133+
// Translate DXL sequence node into GPDB parallel Sequence plan node
5134+
//
5135+
//---------------------------------------------------------------------------
5136+
Plan *
5137+
CTranslatorDXLToPlStmt::TranslateDXLParallelSequence(
5138+
const CDXLNode *sequence_dxlnode, CDXLTranslateContext *output_context,
5139+
CDXLTranslationContextArray *ctxt_translation_prev_siblings)
5140+
{
5141+
CDXLPhysicalParallelSequence *phy_parallel_sequence_dxlop =
5142+
CDXLPhysicalParallelSequence::Cast(sequence_dxlnode->GetOperator());
5143+
5144+
ULONG parallel_workers = phy_parallel_sequence_dxlop->UlParallelWorkers();
5145+
5146+
// create append plan node
5147+
Sequence *psequence = MakeNode(Sequence);
5148+
5149+
Plan *plan = &(psequence->plan);
5150+
plan->plan_node_id = m_dxl_to_plstmt_context->GetNextPlanId();
5151+
5152+
plan->parallel_safe = true;
5153+
plan->parallel = (int) parallel_workers;
5154+
5155+
// translate operator costs
5156+
TranslatePlanCosts(sequence_dxlnode, plan);
5157+
5158+
ULONG arity = sequence_dxlnode->Arity();
5159+
5160+
CDXLTranslateContext child_context(m_mp, false,
5161+
output_context->GetColIdToParamIdMap());
5162+
5163+
for (ULONG ul = 1; ul < arity; ul++)
5164+
{
5165+
CDXLNode *child_dxlnode = (*sequence_dxlnode)[ul];
5166+
5167+
Plan *child_plan = TranslateDXLOperatorToPlan(
5168+
child_dxlnode, &child_context, ctxt_translation_prev_siblings);
5169+
5170+
psequence->subplans = gpdb::LAppend(psequence->subplans, child_plan);
5171+
}
5172+
5173+
CDXLNode *project_list_dxlnode = (*sequence_dxlnode)[0];
5174+
5175+
CDXLTranslationContextArray *child_contexts =
5176+
GPOS_NEW(m_mp) CDXLTranslationContextArray(m_mp);
5177+
child_contexts->Append(&child_context);
5178+
5179+
// translate proj list
5180+
plan->targetlist =
5181+
TranslateDXLProjList(project_list_dxlnode,
5182+
nullptr, // base table translation context
5183+
child_contexts, output_context);
5184+
5185+
SetParamIds(plan);
5186+
5187+
// cleanup
5188+
child_contexts->Release();
5189+
5190+
return (Plan *) psequence;
5191+
}
5192+
49345193
//---------------------------------------------------------------------------
49355194
// @function:
49365195
// CTranslatorDXLToPlStmt::TranslateDXLDynTblScan

src/backend/gporca/libgpdbcost/include/gpdbcost/CCostModelGPDB.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,21 @@ class CCostModelGPDB : public ICostModel
123123
const CCostModelGPDB *pcmgpdb,
124124
const SCostingInfo *pci);
125125

126+
// cost of parallel CTE producer
127+
static CCost CostParallelCTEProducer(CMemoryPool *mp, CExpressionHandle &exprhdl,
128+
const CCostModelGPDB *pcmgpdb,
129+
const SCostingInfo *pci);
130+
126131
// cost of CTE consumer
127132
static CCost CostCTEConsumer(CMemoryPool *mp, CExpressionHandle &exprhdl,
128133
const CCostModelGPDB *pcmgpdb,
129134
const SCostingInfo *pci);
130135

136+
// cost of parallel CTE consumer
137+
static CCost CostParallelCTEConsumer(CMemoryPool *mp, CExpressionHandle &exprhdl,
138+
const CCostModelGPDB *pcmgpdb,
139+
const SCostingInfo *pci);
140+
131141
// cost of const table get
132142
static CCost CostConstTableGet(CMemoryPool *mp, CExpressionHandle &exprhdl,
133143
const CCostModelGPDB *pcmgpdb,
@@ -158,6 +168,11 @@ class CCostModelGPDB : public ICostModel
158168
const CCostModelGPDB *pcmgpdb,
159169
const SCostingInfo *pci);
160170

171+
// cost of paralllel sequence
172+
static CCost CostParallelSequence(CMemoryPool *mp, CExpressionHandle &exprhdl,
173+
const CCostModelGPDB *pcmgpdb,
174+
const SCostingInfo *pci);
175+
161176
// cost of sort
162177
static CCost CostSort(CMemoryPool *mp, CExpressionHandle &exprhdl,
163178
const CCostModelGPDB *pcmgpdb,

0 commit comments

Comments
 (0)