diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 4cee357..9e02a46 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -107,6 +107,7 @@ static void show_tidbitmap_info(BitmapHeapScanState *planstate, static void show_instrumentation_count(const char *qlabel, int which, PlanState *planstate, ExplainState *es); static void show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es); +static void show_eval_params(Bitmapset *bms_params, ExplainState *es); static const char *explain_get_index_name(Oid indexId); static void show_buffer_usage(ExplainState *es, const BufferUsage *usage); static void ExplainIndexScanDetails(Oid indexid, ScanDirection indexorderdir, @@ -637,7 +638,17 @@ ExplainPrintPlan(ExplainState *es, QueryDesc *queryDesc) */ ps = queryDesc->planstate; if (IsA(ps, GatherState) &&((Gather *) ps->plan)->invisible) + { + List *initPlanState = NULL; + PlanState *save_ps; + + /* initplans are always attached to the top node (cf standard_planner) */ + save_ps = ps; + initPlanState = ps->initPlan; ps = outerPlanState(ps); + ps->initPlan = initPlanState; + save_ps->initPlan = NIL; + } ExplainNode(ps, NIL, NULL, NULL, es); } @@ -1447,6 +1458,11 @@ ExplainNode(PlanState *planstate, List *ancestors, planstate, es); ExplainPropertyInteger("Workers Planned", gather->num_workers, es); + + /* Show params evaluated at gather node */ + if (gather->initParam) + show_eval_params(gather->initParam, es); + if (es->analyze) { int nworkers; @@ -1469,6 +1485,11 @@ ExplainNode(PlanState *planstate, List *ancestors, planstate, es); ExplainPropertyInteger("Workers Planned", gm->num_workers, es); + + /* Show params evaluated at gather-merge node */ + if (gm->initParam) + show_eval_params(gm->initParam, es); + if (es->analyze) { int nworkers; @@ -2494,6 +2515,29 @@ show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es) } /* + * Show initplan params evaluated at gather or gather merge node. + */ +static void +show_eval_params(Bitmapset *bms_params, ExplainState *es) +{ + int paramid = -1; + List *params = NIL; + + Assert(bms_params); + + while ((paramid = bms_next_member(bms_params, paramid)) >= 0) + { + char param[32]; + + snprintf(param, sizeof(param), "$%d", paramid); + params = lappend(params, pstrdup(param)); + } + + if (params) + ExplainPropertyList("Params Evaluated", params, es); +} + +/* * Fetch the name of an index in an EXPLAIN * * We allow plugins to get control here so that plans involving hypothetical diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c index bd8a15d..5eff791 100644 --- a/src/backend/executor/execExprInterp.c +++ b/src/backend/executor/execExprInterp.c @@ -1861,6 +1861,33 @@ ExecEvalParamExec(ExprState *state, ExprEvalStep *op, ExprContext *econtext) } /* + * ExecEvalParamExecParams + * + * Execute the subplan stored in PARAM_EXEC initplans params, if not executed + * till now. + */ +void +ExecEvalParamExecParams(Bitmapset *params, EState *estate) +{ + ParamExecData *prm; + int paramid; + + paramid = -1; + while ((paramid = bms_next_member(params, paramid)) >= 0) + { + prm = &(estate->es_param_exec_vals[paramid]); + + if (prm->execPlan != NULL) + { + /* Parameter not evaluated yet, so go do it */ + ExecSetParamPlan(prm->execPlan, GetPerTupleExprContext(estate)); + /* ExecSetParamPlan should have processed this param... */ + Assert(prm->execPlan == NULL); + } + } +} + +/* * Evaluate a PARAM_EXTERN parameter. * * PARAM_EXTERN parameters must be sought in ecxt_param_list_info. diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 8737cc1..e758521 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -23,6 +23,7 @@ #include "postgres.h" +#include "executor/execExpr.h" #include "executor/execParallel.h" #include "executor/executor.h" #include "executor/nodeBitmapHeapscan.h" @@ -31,6 +32,7 @@ #include "executor/nodeIndexscan.h" #include "executor/nodeIndexonlyscan.h" #include "executor/nodeSeqscan.h" +#include "executor/nodeSubplan.h" #include "executor/nodeSort.h" #include "executor/tqueue.h" #include "nodes/nodeFuncs.h" @@ -56,6 +58,7 @@ #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006) #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) +#define PARALLEL_KEY_INITPLAN_PARAMS UINT64CONST(0xE000000000000009) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -395,7 +398,8 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) * execution and return results to the main backend. */ ParallelExecutorInfo * -ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, +ExecInitParallelPlan(PlanState *planstate, EState *estate, + Bitmapset *initParam, int nworkers, int64 tuples_needed) { ParallelExecutorInfo *pei; @@ -406,10 +410,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, char *pstmt_data; char *pstmt_space; char *param_space; + char *initplan_param_space; BufferUsage *bufusage_space; SharedExecutorInstrumentation *instrumentation = NULL; int pstmt_len; int param_len; + int initplan_param_len; int instrumentation_len = 0; int instrument_offset = 0; Size dsa_minsize = dsa_minimum_size(); @@ -421,6 +427,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, pei->finished = false; pei->planstate = planstate; + ExecEvalParamExecParams(initParam, estate); + /* Fix up and serialize plan to be sent to workers. */ pstmt_data = ExecSerializePlan(planstate->plan, estate); @@ -454,6 +462,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, shm_toc_estimate_chunk(&pcxt->estimator, param_len); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for initplan params. */ + initplan_param_len = EstimateInitPlanParamsSpace(estate->es_param_exec_vals, initParam); + shm_toc_estimate_chunk(&pcxt->estimator, initplan_param_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* * Estimate space for BufferUsage. * @@ -528,6 +541,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, param_space); SerializeParamList(estate->es_param_list_info, ¶m_space); + /* Store serialized initplan params. */ + initplan_param_space = shm_toc_allocate(pcxt->toc, initplan_param_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INITPLAN_PARAMS, initplan_param_space); + SerializeInitPlanParams(estate->es_param_exec_vals, initParam, &initplan_param_space); + /* Allocate space for each worker's BufferUsage; no need to initialize. */ bufusage_space = shm_toc_allocate(pcxt->toc, mul_size(sizeof(BufferUsage), pcxt->nworkers)); @@ -865,6 +883,21 @@ ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc) } /* + * Copy the ParamExecData params corresponding to initplans from dynamic + * shared memory. This has to be done once the params are allocated by + * executor; that is after ExecutorStart(). + */ +static void +ExecParallelInitializeInitPlanParams(shm_toc *toc, ParamExecData *params) +{ + char *paramspace; + + /* Reconstruct initplan params. */ + paramspace = shm_toc_lookup(toc, PARALLEL_KEY_INITPLAN_PARAMS, false); + RestoreInitPlanParams(¶mspace, params); +} + +/* * Create a QueryDesc for the PlannedStmt we are to execute, and return it. */ static QueryDesc * @@ -1049,6 +1082,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Special executor initialization steps for parallel workers */ queryDesc->planstate->state->es_query_dsa = area; + ExecParallelInitializeInitPlanParams(toc, queryDesc->estate->es_param_exec_vals); ExecParallelInitializeWorker(queryDesc->planstate, toc); /* Pass down any tuple bound */ diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 022d75b..4e35c51 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -158,6 +158,7 @@ ExecGather(PlanState *pstate) if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, + gather->initParam, gather->num_workers, node->tuples_needed); else diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index d20d466..ffff1a7 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -202,6 +202,7 @@ ExecGatherMerge(PlanState *pstate) if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, + gm->initParam, gm->num_workers, node->tuples_needed); else diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c index 4447b7c..52514e4 100644 --- a/src/backend/executor/nodeNestloop.c +++ b/src/backend/executor/nodeNestloop.c @@ -130,6 +130,7 @@ ExecNestLoop(PlanState *pstate) { NestLoopParam *nlp = (NestLoopParam *) lfirst(lc); int paramno = nlp->paramno; + TupleDesc tdesc = outerTupleSlot->tts_tupleDescriptor; ParamExecData *prm; prm = &(econtext->ecxt_param_exec_vals[paramno]); @@ -140,6 +141,7 @@ ExecNestLoop(PlanState *pstate) prm->value = slot_getattr(outerTupleSlot, nlp->paramval->varattno, &(prm->isnull)); + prm->ptype = TupleDescAttr(tdesc, nlp->paramval->varattno - 1)->atttypid; /* Flag parameter value as changed */ innerPlan->chgParam = bms_add_member(innerPlan->chgParam, paramno); diff --git a/src/backend/executor/nodeSubplan.c b/src/backend/executor/nodeSubplan.c index 77ef6f3..06409ed 100644 --- a/src/backend/executor/nodeSubplan.c +++ b/src/backend/executor/nodeSubplan.c @@ -30,12 +30,16 @@ #include #include "access/htup_details.h" +#include "catalog/pg_type.h" #include "executor/executor.h" #include "executor/nodeSubplan.h" #include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" #include "miscadmin.h" #include "optimizer/clauses.h" +#include "storage/shmem.h" #include "utils/array.h" +#include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -273,11 +277,13 @@ ExecScanSubPlan(SubPlanState *node, forboth(l, subplan->parParam, pvar, node->args) { int paramid = lfirst_int(l); + ExprState *exprstate = (ExprState *) lfirst(pvar); ParamExecData *prm = &(econtext->ecxt_param_exec_vals[paramid]); - prm->value = ExecEvalExprSwitchContext((ExprState *) lfirst(pvar), + prm->value = ExecEvalExprSwitchContext(exprstate, econtext, &(prm->isnull)); + prm->ptype = exprType((Node *) exprstate->expr); planstate->chgParam = bms_add_member(planstate->chgParam, paramid); } @@ -390,6 +396,7 @@ ExecScanSubPlan(SubPlanState *node, prmdata = &(econtext->ecxt_param_exec_vals[paramid]); Assert(prmdata->execPlan == NULL); prmdata->value = slot_getattr(slot, col, &(prmdata->isnull)); + prmdata->ptype = TupleDescAttr(tdesc, col - 1)->atttypid; col++; } @@ -556,11 +563,13 @@ buildSubPlanHash(SubPlanState *node, ExprContext *econtext) { int paramid = lfirst_int(plst); ParamExecData *prmdata; + TupleDesc tdesc = slot->tts_tupleDescriptor; prmdata = &(innerecontext->ecxt_param_exec_vals[paramid]); Assert(prmdata->execPlan == NULL); prmdata->value = slot_getattr(slot, col, &(prmdata->isnull)); + prmdata->ptype = TupleDescAttr(tdesc, col - 1)->atttypid; col++; } slot = ExecProject(node->projRight); @@ -924,6 +933,7 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) ListCell *l; bool found = false; ArrayBuildStateAny *astate = NULL; + Oid ptype; if (subLinkType == ANY_SUBLINK || subLinkType == ALL_SUBLINK) @@ -931,6 +941,8 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) if (subLinkType == CTE_SUBLINK) elog(ERROR, "CTE subplans should not be executed via ExecSetParamPlan"); + ptype = exprType((Node *) node->subplan); + /* Initialize ArrayBuildStateAny in caller's context, if needed */ if (subLinkType == ARRAY_SUBLINK) astate = initArrayResultAny(subplan->firstColType, @@ -953,11 +965,13 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) forboth(l, subplan->parParam, pvar, node->args) { int paramid = lfirst_int(l); + ExprState *exprstate = (ExprState *) lfirst(pvar); ParamExecData *prm = &(econtext->ecxt_param_exec_vals[paramid]); - prm->value = ExecEvalExprSwitchContext((ExprState *) lfirst(pvar), + prm->value = ExecEvalExprSwitchContext(exprstate, econtext, &(prm->isnull)); + prm->ptype = exprType((Node *) exprstate->expr); planstate->chgParam = bms_add_member(planstate->chgParam, paramid); } @@ -980,6 +994,7 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) prm->execPlan = NULL; prm->value = BoolGetDatum(true); + prm->ptype = ptype; prm->isnull = false; found = true; break; @@ -1031,6 +1046,7 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) prm->execPlan = NULL; prm->value = heap_getattr(node->curTuple, i, tdesc, &(prm->isnull)); + prm->ptype = TupleDescAttr(tdesc, i - 1)->atttypid; i++; } } @@ -1053,6 +1069,7 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) true); prm->execPlan = NULL; prm->value = node->curArray; + prm->ptype = ptype; prm->isnull = false; } else if (!found) @@ -1065,6 +1082,7 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) prm->execPlan = NULL; prm->value = BoolGetDatum(false); + prm->ptype = ptype; prm->isnull = false; } else @@ -1077,6 +1095,7 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) prm->execPlan = NULL; prm->value = (Datum) 0; + prm->ptype = VOIDOID; prm->isnull = true; } } @@ -1207,3 +1226,133 @@ ExecAlternativeSubPlan(AlternativeSubPlanState *node, return ExecSubPlan(activesp, econtext, isNull); } + +/* + * Estimate the amount of space required to serialize the InitPlan params. + */ +Size +EstimateInitPlanParamsSpace(ParamExecData *paramExecVals, Bitmapset *params) +{ + int paramid; + Size sz = sizeof(int); + ParamExecData *prm; + + if (params == NULL) + return sz; + + paramid = -1; + while ((paramid = bms_next_member(params, paramid)) >= 0) + { + Oid typeOid; + int16 typLen; + bool typByVal; + + prm = &(paramExecVals[paramid]); + typeOid = prm->ptype; + + sz = add_size(sz, sizeof(int)); /* space for paramid */ + sz = add_size(sz, sizeof(Oid)); /* space for type OID */ + + /* space for datum/isnull */ + if (OidIsValid(typeOid)) + get_typlenbyval(typeOid, &typLen, &typByVal); + else + { + /* If no type OID, assume by-value, like copyParamList does. */ + typLen = sizeof(Datum); + typByVal = true; + } + sz = add_size(sz, + datumEstimateSpace(prm->value, prm->isnull, typByVal, typLen)); + } + return sz; +} + +/* + * Serialize ParamExecData params corresponding to initplans. + * + * We write the number of parameters first, as a 4-byte integer, and then + * write details for each parameter in turn. The details for each parameter + * consist of a 4-byte paramid (location of param in execution time internal + * parameter array), 4-byte type OID, and then the datum as serialized by + * datumSerialize(). + * + * The above format is quite similar to the format used to serialize + * paramListInfo structure, so if we change either format, then consider to + * change at both the places. + */ +void +SerializeInitPlanParams(ParamExecData *paramExecVals, Bitmapset *params, + char **start_address) +{ + int nparams; + int paramid; + ParamExecData *prm; + + nparams = bms_num_members(params); + memcpy(*start_address, &nparams, sizeof(int)); + *start_address += sizeof(int); + + paramid = -1; + while ((paramid = bms_next_member(params, paramid)) >= 0) + { + Oid typeOid; + int16 typLen; + bool typByVal; + + prm = &(paramExecVals[paramid]); + typeOid = prm->ptype; + + /* Write paramid. */ + memcpy(*start_address, ¶mid, sizeof(int)); + *start_address += sizeof(int); + + /* Write OID. */ + memcpy(*start_address, &typeOid, sizeof(Oid)); + *start_address += sizeof(Oid); + + /* space for datum/isnull */ + if (OidIsValid(typeOid)) + get_typlenbyval(typeOid, &typLen, &typByVal); + else + { + /* If no type OID, assume by-value, like copyParamList does. */ + typLen = sizeof(Datum); + typByVal = true; + } + datumSerialize(prm->value, prm->isnull, typByVal, typLen, + start_address); + } +} + +/* + * Restore ParamExecData params corresponding to initplans. + */ +void +RestoreInitPlanParams(char **start_address, ParamExecData *params) +{ + int nparams; + int i; + int paramid; + + memcpy(&nparams, *start_address, sizeof(int)); + *start_address += sizeof(int); + + for (i = 0; i < nparams; i++) + { + ParamExecData *prm; + + /* Read paramid */ + memcpy(¶mid, *start_address, sizeof(int)); + *start_address += sizeof(int); + prm = ¶ms[paramid]; + + /* Read type OID. */ + memcpy(&prm->ptype, *start_address, sizeof(Oid)); + *start_address += sizeof(Oid); + + /* Read datum/isnull. */ + prm->value = datumRestore(start_address, &prm->isnull); + prm->execPlan = NULL; + } +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index f1bed14..dbac5fb 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -364,6 +364,7 @@ _copyGather(const Gather *from) COPY_SCALAR_FIELD(rescan_param); COPY_SCALAR_FIELD(single_copy); COPY_SCALAR_FIELD(invisible); + COPY_BITMAPSET_FIELD(initParam); return newnode; } @@ -391,6 +392,7 @@ _copyGatherMerge(const GatherMerge *from) COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid)); COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid)); COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool)); + COPY_BITMAPSET_FIELD(initParam); return newnode; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index b83d919..9ead312 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -482,6 +482,7 @@ _outGather(StringInfo str, const Gather *node) WRITE_INT_FIELD(rescan_param); WRITE_BOOL_FIELD(single_copy); WRITE_BOOL_FIELD(invisible); + WRITE_BITMAPSET_FIELD(initParam); } static void @@ -512,6 +513,8 @@ _outGatherMerge(StringInfo str, const GatherMerge *node) appendStringInfoString(str, " :nullsFirst"); for (i = 0; i < node->numCols; i++) appendStringInfo(str, " %s", booltostr(node->nullsFirst[i])); + + WRITE_BITMAPSET_FIELD(initParam); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index fbf8330..baee0cd 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -2166,6 +2166,7 @@ _readGather(void) READ_INT_FIELD(rescan_param); READ_BOOL_FIELD(single_copy); READ_BOOL_FIELD(invisible); + READ_BITMAPSET_FIELD(initParam); READ_DONE(); } @@ -2187,6 +2188,7 @@ _readGatherMerge(void) READ_OID_ARRAY(sortOperators, local_node->numCols); READ_OID_ARRAY(collations, local_node->numCols); READ_BOOL_ARRAY(nullsFirst, local_node->numCols); + READ_BITMAPSET_FIELD(initParam); READ_DONE(); } diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 5b746a9..1044b0e 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -39,6 +39,7 @@ #include "optimizer/planner.h" #include "optimizer/prep.h" #include "optimizer/restrictinfo.h" +#include "optimizer/subselect.h" #include "optimizer/tlist.h" #include "optimizer/var.h" #include "parser/parse_clause.h" @@ -2238,6 +2239,14 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel) Path *simple_gather_path; ListCell *lc; + /* + * We don't want to generate gather or gather merge node if there are + * initplans at some query level below the current query level as those + * plans could be parallel-unsafe or undirect correlated plans. + */ + if (contains_parallel_unsafe_param(root, rel)) + return; + /* If there are no partial paths, there's nothing to do here. */ if (rel->partial_pathlist == NIL) return; diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 2821662..0363d68 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -6257,6 +6257,7 @@ make_gather(List *qptlist, node->rescan_param = rescan_param; node->single_copy = single_copy; node->invisible = false; + node->initParam = NULL; return node; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 7f146d6..47a6b0d 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -366,6 +366,14 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) { Gather *gather = makeNode(Gather); + /* + * If there are any initPlans attached to the formerly-top plan node, + * move them up to the Gather node; same as we do for Material node in + * materialize_finished_plan. + */ + gather->plan.initPlan = top_plan->initPlan; + top_plan->initPlan = NIL; + gather->plan.targetlist = top_plan->targetlist; gather->plan.qual = NIL; gather->plan.lefttree = top_plan; @@ -3580,6 +3588,12 @@ create_grouping_paths(PlannerInfo *root, ListCell *lc; + /* + * Don't parallelize the plan if there is an initplan below current query + * level. See generate_gather_paths() for detailed reason. + */ + (void) contains_parallel_unsafe_param(root, input_rel); + /* For now, do all work in the (GROUP_AGG, NULL) upperrel */ grouped_rel = fetch_upper_rel(root, UPPERREL_GROUP_AGG, NULL); @@ -4942,6 +4956,12 @@ create_ordered_paths(PlannerInfo *root, RelOptInfo *ordered_rel; ListCell *lc; + /* + * Don't parallelize the plan if there is an initplan below current query + * level. See generate_gather_paths() for detailed reason. + */ + (void) contains_parallel_unsafe_param(root, input_rel); + /* For now, do all work in the (ORDERED, NULL) upperrel */ ordered_rel = fetch_upper_rel(root, UPPERREL_ORDERED, NULL); diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index b0c9e94..5fa2bb1 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -104,6 +104,7 @@ static Node *fix_scan_expr_mutator(Node *node, fix_scan_expr_context *context); static bool fix_scan_expr_walker(Node *node, fix_scan_expr_context *context); static void set_join_references(PlannerInfo *root, Join *join, int rtoffset); static void set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset); +static void set_param_references(PlannerInfo *root, Plan *plan); static Node *convert_combining_aggrefs(Node *node, void *context); static void set_dummy_tlist_references(Plan *plan, int rtoffset); static indexed_tlist *build_tlist_index(List *tlist); @@ -628,7 +629,10 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) case T_Gather: case T_GatherMerge: - set_upper_references(root, plan, rtoffset); + { + set_upper_references(root, plan, rtoffset); + set_param_references(root, plan); + } break; case T_Hash: @@ -1784,6 +1788,49 @@ set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset) } /* + * set_param_references + * Initialize the initParam list in gather or gather merge node such that + * it contains reference of all the params that needs to be evaluated + * before execution of the node. It contains the initplan params that are + * being passed to the plan nodes below it. + */ +static void +set_param_references(PlannerInfo *root, Plan *plan) +{ + Assert(IsA(plan, Gather) || IsA(plan, GatherMerge)); + + if (plan->lefttree->extParam) + { + PlannerInfo *proot; + Bitmapset *initSetParam = NULL; + ListCell *l; + + for (proot = root; proot != NULL; proot = proot->parent_root) + { + foreach(l, proot->init_plans) + { + SubPlan *initsubplan = (SubPlan *) lfirst(l); + ListCell *l2; + + foreach(l2, initsubplan->setParam) + { + initSetParam = bms_add_member(initSetParam, lfirst_int(l2)); + } + } + } + + /* + * Remember the list of all external initplan params that are used by + * the children of gather or gather merge node. + */ + if (IsA(plan, Gather)) + ((Gather *) plan)->initParam = bms_intersect(plan->lefttree->extParam, initSetParam); + else + ((GatherMerge *) plan)->initParam = bms_intersect(plan->lefttree->extParam, initSetParam); + } +} + +/* * Recursively scan an expression tree and convert Aggrefs to the proper * intermediate form for combining aggregates. This means (1) replacing each * one's argument list with a single argument that is the original Aggref diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index 1103984..8c99f48 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -84,6 +84,7 @@ static Bitmapset *finalize_plan(PlannerInfo *root, Bitmapset *scan_params); static bool finalize_primnode(Node *node, finalize_primnode_context *context); static bool finalize_agg_primnode(Node *node, finalize_primnode_context *context); +static bool is_initplan_is_below_current_query_level(PlannerInfo *root); /* @@ -2136,13 +2137,11 @@ SS_identify_outer_params(PlannerInfo *root) } /* - * SS_charge_for_initplans - account for initplans in Path costs & parallelism + * SS_charge_for_initplans - account for initplans in Path costs * * If any initPlans have been created in the current query level, they will * get attached to the Plan tree created from whichever Path we select from - * the given rel. Increment all that rel's Paths' costs to account for them, - * and make sure the paths get marked as parallel-unsafe, since we can't - * currently transmit initPlans to parallel workers. + * the given rel. Increment all that rel's Paths' costs to account for them. * * This is separate from SS_attach_initplans because we might conditionally * create more initPlans during create_plan(), depending on which Path we @@ -2174,7 +2173,7 @@ SS_charge_for_initplans(PlannerInfo *root, RelOptInfo *final_rel) } /* - * Now adjust the costs and parallel_safe flags. + * Now adjust the costs. */ foreach(lc, final_rel->pathlist) { @@ -2182,7 +2181,6 @@ SS_charge_for_initplans(PlannerInfo *root, RelOptInfo *final_rel) path->startup_cost += initplan_cost; path->total_cost += initplan_cost; - path->parallel_safe = false; } /* We needn't do set_cheapest() here, caller will do it */ @@ -3000,3 +2998,61 @@ SS_make_initplan_from_plan(PlannerInfo *root, /* Set costs of SubPlan using info from the plan tree */ cost_subplan(subroot, node, plan); } + +/* + * is_initplan_below_current_query_level - is there any initplan present below + * current query level. + */ +static bool +is_initplan_is_below_current_query_level(PlannerInfo *root) +{ + ListCell *lc; + + /* + * If the subplan corresponding to the subroot is an initPlan, it'll be + * attached to its parent root. Hence, we check the query level of its + * parent root and if any init_plans are attached there. + */ + foreach(lc, root->glob->subroots) + { + PlannerInfo *subroot = (PlannerInfo *) lfirst(lc); + PlannerInfo *proot = subroot->parent_root; + + if (proot->query_level > root->query_level && proot->init_plans) + return true; + } + + return false; +} + +/* + * contains_parallel_unsafe_param - Check if there is any initplan present below + * current query level, mark all the partial and non-partial paths for a relation + * at this level as parallel-unsafe. + */ +bool +contains_parallel_unsafe_param(PlannerInfo *root, RelOptInfo *rel) +{ + ListCell *lc; + + if (is_initplan_is_below_current_query_level(root)) + { + foreach(lc, rel->partial_pathlist) + { + Path *subpath = (Path *) lfirst(lc); + + subpath->parallel_safe = false; + } + foreach(lc, rel->pathlist) + { + Path *subpath = (Path *) lfirst(lc); + + subpath->parallel_safe = false; + } + rel->consider_parallel = false; + + return true; + } + + return false; +} diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 93add27..48e502c 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -91,6 +91,7 @@ typedef struct typedef struct { + PlannerInfo *root; char max_hazard; /* worst proparallel hazard found so far */ char max_interesting; /* worst proparallel hazard of interest */ List *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */ @@ -1069,6 +1070,7 @@ max_parallel_hazard(Query *parse) { max_parallel_hazard_context context; + context.root = NULL; context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_UNSAFE; context.safe_param_ids = NIL; @@ -1098,6 +1100,7 @@ is_parallel_safe(PlannerInfo *root, Node *node) root->glob->nParamExec == 0) return true; /* Else use max_parallel_hazard's search logic, but stop on RESTRICTED */ + context.root = root; context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_RESTRICTED; context.safe_param_ids = NIL; @@ -1222,21 +1225,47 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) } /* - * We can't pass Params to workers at the moment either, so they are also - * parallel-restricted, unless they are PARAM_EXEC Params listed in + * As of now, we can only pass Params that refer to the same or parent + * query level (see generate_gather_paths) or they are listed in * safe_param_ids, meaning they could be generated within the worker. */ else if (IsA(node, Param)) { + int paramid; + PlannerInfo *root; Param *param = (Param *) node; - if (param->paramkind != PARAM_EXEC || - !list_member_int(context->safe_param_ids, param->paramid)) + if (list_member_int(context->safe_param_ids, param->paramid)) + return false; + + root = context->root; + paramid = ((Param *) node)->paramid; + + if (root) { - if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) - return true; + PlannerInfo *proot; + ListCell *l; + + for (proot = root; proot != NULL; proot = proot->parent_root) + { + foreach(l, proot->init_plans) + { + SubPlan *initsubplan = (SubPlan *) lfirst(l); + ListCell *l2; + + foreach(l2, initsubplan->setParam) + { + int initparam = lfirst_int(l2); + + if (paramid == initparam) + return false; + } + } + } } - return false; /* nothing to recurse to */ + + if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) + return true; } /* diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h index 8ee0496..c44ad88 100644 --- a/src/include/executor/execExpr.h +++ b/src/include/executor/execExpr.h @@ -611,6 +611,7 @@ extern ExprEvalOp ExecEvalStepOp(ExprState *state, ExprEvalStep *op); */ extern void ExecEvalParamExec(ExprState *state, ExprEvalStep *op, ExprContext *econtext); +extern void ExecEvalParamExecParams(Bitmapset *params, EState *estate); extern void ExecEvalParamExtern(ExprState *state, ExprEvalStep *op, ExprContext *econtext); extern void ExecEvalSQLValueFunction(ExprState *state, ExprEvalStep *op); diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index ed231f2..e62c596 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -35,7 +35,8 @@ typedef struct ParallelExecutorInfo } ParallelExecutorInfo; extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, - EState *estate, int nworkers, int64 tuples_needed); + EState *estate, Bitmapset *initParam, int nworkers, + int64 tuples_needed); extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei, TupleDesc tupDesc); extern void ExecParallelFinish(ParallelExecutorInfo *pei); diff --git a/src/include/executor/nodeSubplan.h b/src/include/executor/nodeSubplan.h index 5dbaeeb..40b1a34 100644 --- a/src/include/executor/nodeSubplan.h +++ b/src/include/executor/nodeSubplan.h @@ -28,4 +28,10 @@ extern void ExecReScanSetParamPlan(SubPlanState *node, PlanState *parent); extern void ExecSetParamPlan(SubPlanState *node, ExprContext *econtext); +extern Size EstimateInitPlanParamsSpace(ParamExecData *paramExecVals, Bitmapset *params); + +extern void SerializeInitPlanParams(ParamExecData *paramExecVals, Bitmapset *params, char **start_address); + +extern void RestoreInitPlanParams(char **start_address, ParamExecData *params); + #endif /* NODESUBPLAN_H */ diff --git a/src/include/nodes/params.h b/src/include/nodes/params.h index 55219da..e96a1ea 100644 --- a/src/include/nodes/params.h +++ b/src/include/nodes/params.h @@ -98,6 +98,16 @@ typedef struct ParamExecData { void *execPlan; /* should be "SubPlanState *" */ Datum value; + + /* + * parameter's datatype, or 0. This is required so that datum value can + * be read and used for other purposes like passing it to worker backend + * via shared memory. This is required only for initPlan's evaluation, + * however for consistency we set this for Subplan as well. We left it + * for other cases like CTE or RecursiveUnion cases where this structure + * is not used for evaluation of subplans. + */ + Oid ptype; bool isnull; } ParamExecData; diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index a382331..a16909c 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -841,6 +841,8 @@ typedef struct Gather int rescan_param; /* ID of Param that signals a rescan, or -1 */ bool single_copy; /* don't execute plan more than once */ bool invisible; /* suppress EXPLAIN display (for testing)? */ + Bitmapset *initParam; /* param id's of initplans which are referred + * at gather or one of it's child node */ } Gather; /* ------------ @@ -858,6 +860,8 @@ typedef struct GatherMerge Oid *sortOperators; /* OIDs of operators to sort them by */ Oid *collations; /* OIDs of collations */ bool *nullsFirst; /* NULLS FIRST/LAST directions */ + Bitmapset *initParam; /* param id's of initplans which are referred + * at gather merge or one of it's child node */ } GatherMerge; /* ---------------- diff --git a/src/include/optimizer/subselect.h b/src/include/optimizer/subselect.h index ecd2011..a35df55 100644 --- a/src/include/optimizer/subselect.h +++ b/src/include/optimizer/subselect.h @@ -35,6 +35,8 @@ extern Param *SS_make_initplan_output_param(PlannerInfo *root, extern void SS_make_initplan_from_plan(PlannerInfo *root, PlannerInfo *subroot, Plan *plan, Param *prm); +extern bool initplan_is_below_current_query_level(PlannerInfo *root); +extern bool contains_parallel_unsafe_param(PlannerInfo *root, RelOptInfo *rel); extern Param *assign_nestloop_param_var(PlannerInfo *root, Var *var); extern Param *assign_nestloop_param_placeholdervar(PlannerInfo *root, PlaceHolderVar *phv); diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 2ae600f..22af824 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -139,6 +139,39 @@ explain (costs off) (4 rows) alter table tenk2 reset (parallel_workers); +--test parallel plan for a query containing initplan. +set enable_indexscan = off; +set enable_indexonlyscan = off; +set enable_bitmapscan = off; +alter table tenk2 set (parallel_workers = 2); +explain (costs off) + select count(*) from tenk1 where tenk1.unique1 = (Select max(tenk2.unique1) from tenk2); + QUERY PLAN +------------------------------------------------------ + Aggregate + InitPlan 1 (returns $2) + -> Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Seq Scan on tenk2 + -> Gather + Workers Planned: 4 + Params Evaluated: $2 + -> Parallel Seq Scan on tenk1 + Filter: (unique1 = $2) +(12 rows) + +select count(*) from tenk1 where tenk1.unique1 = (Select max(tenk2.unique1) from tenk2); + count +------- + 1 +(1 row) + +reset enable_indexscan; +reset enable_indexonlyscan; +reset enable_bitmapscan; +alter table tenk2 reset (parallel_workers); -- test parallel index scans. set enable_seqscan to off; set enable_bitmapscan to off; diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index 89fe80a..ae144ca 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -52,6 +52,21 @@ explain (costs off) (select ten from tenk2); alter table tenk2 reset (parallel_workers); +--test parallel plan for a query containing initplan. +set enable_indexscan = off; +set enable_indexonlyscan = off; +set enable_bitmapscan = off; +alter table tenk2 set (parallel_workers = 2); + +explain (costs off) + select count(*) from tenk1 where tenk1.unique1 = (Select max(tenk2.unique1) from tenk2); +select count(*) from tenk1 where tenk1.unique1 = (Select max(tenk2.unique1) from tenk2); + +reset enable_indexscan; +reset enable_indexonlyscan; +reset enable_bitmapscan; +alter table tenk2 reset (parallel_workers); + -- test parallel index scans. set enable_seqscan to off; set enable_bitmapscan to off;