From 90ee038cec103a85307711b861b431317f1cd5bf Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 14 Dec 2020 15:16:49 +0530 Subject: [PATCH v11] Tuple Cost Adjustment for Parallel Inserts in CTAS --- src/backend/commands/createas.c | 42 +++++++++++++++++- src/backend/commands/explain.c | 14 ++++-- src/backend/commands/prepare.c | 3 +- src/backend/optimizer/path/costsize.c | 22 +++++++++- src/backend/optimizer/plan/planner.c | 61 +++++++++++++++++++++++++++ src/include/commands/createas.h | 21 ++++++++- src/include/commands/explain.h | 3 +- src/include/nodes/parsenodes.h | 1 + 8 files changed, 158 insertions(+), 9 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 9e6c8fb2ba..3ffea41ea6 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -316,6 +316,13 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, query = linitial_node(Query, rewritten); Assert(query->commandType == CMD_SELECT); + /* + * Indication to the planner that the SELECT is from CTAS so that it + * can adjust the parallel tuple cost if possible. + */ + if (IsParallelInsertInCTASAllowed(into, NULL, NULL)) + query->CTASParallelInsInfo |= CTAS_PARALLEL_INS_SELECT; + /* plan the query */ plan = pg_plan_query(query, pstate->p_sourcetext, CURSOR_OPT_PARALLEL_OK, params); @@ -344,7 +351,8 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, * into the target table. We need plan state to be initialized by the * executor to decide whether to allow parallel inserts or not. */ - if (IsParallelInsertInCTASAllowed(into, queryDesc)) + if (IsParallelInsertInCTASAllowed(into, queryDesc, + &query->CTASParallelInsInfo)) SetCTASParallelInsertState(queryDesc); /* run the plan to completion */ @@ -659,7 +667,8 @@ intorel_destroy(DestReceiver *self) * IsParallelInsertInCTASAllowed --- determine whether or not parallel * insertion is possible. */ -bool IsParallelInsertInCTASAllowed(IntoClause *into, QueryDesc *queryDesc) +bool IsParallelInsertInCTASAllowed(IntoClause *into, QueryDesc *queryDesc, + uint8 *tuple_cost_flags) { if (!IS_CTAS(into)) return false; @@ -678,6 +687,7 @@ bool IsParallelInsertInCTASAllowed(IntoClause *into, QueryDesc *queryDesc) { PlanState *ps = queryDesc->planstate; bool allow; + bool need_to_assert = false; /* * We allow parallel inserts by the workers only if the Gather node has @@ -690,6 +700,34 @@ bool IsParallelInsertInCTASAllowed(IntoClause *into, QueryDesc *queryDesc) */ allow = ps && IsA(ps, GatherState) && !ps->ps_ProjInfo; + /* + * It should not happen that in cost_gather we have ignored the + * parallel tuple cost and now we are not allowing the parallel + * inserts. And also we might need assertion only if the top node is + * GatherState. Because the main intention of assertion is to check if + * we enforced planner to ignore the parallel tuple cost (with the + * intention of choosing parallel inserts) due to which + * the parallel plan was chosen, but we do not allow the parallel + * inserts now. + */ + if (!allow && tuple_cost_flags && ps && IsA(ps, GatherState)) + need_to_assert = true; + + if (need_to_assert) + { + /* + * If we have correctly ignored parallel tuple cost in planner + * while creating Gather path, then this assertion failure should + * not occur. If it occurs, that means the planner may have chosen + * this parallel plan because of our enforcement to ignore the + * parallel tuple cost. + */ + Assert(!(*tuple_cost_flags & CTAS_PARALLEL_INS_TUP_COST_IGNORED)); + } + + if (tuple_cost_flags) + *tuple_cost_flags = CTAS_PARALLEL_INS_UNDEF; + return allow; } diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 03ac29cd64..d0152deba7 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -387,6 +387,13 @@ ExplainOneQuery(Query *query, int cursorOptions, bufusage_start = pgBufferUsage; INSTR_TIME_SET_CURRENT(planstart); + /* + * Indication to the planner that the SELECT is from CTAS so that it + * can adjust the parallel tuple cost if possible. + */ + if (IsParallelInsertInCTASAllowed(into, NULL, NULL)) + query->CTASParallelInsInfo |= CTAS_PARALLEL_INS_SELECT; + /* plan the query */ plan = pg_plan_query(query, queryString, cursorOptions, params); @@ -402,7 +409,8 @@ ExplainOneQuery(Query *query, int cursorOptions, /* run it (if needed) and produce output */ ExplainOnePlan(plan, into, es, queryString, params, queryEnv, - &planduration, (es->buffers ? &bufusage : NULL)); + &planduration, (es->buffers ? &bufusage : NULL), + &query->CTASParallelInsInfo); } } @@ -496,7 +504,7 @@ void ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, - const BufferUsage *bufusage) + const BufferUsage *bufusage, uint8 *ctas_tuple_cost_flags) { DestReceiver *dest; QueryDesc *queryDesc; @@ -562,7 +570,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, * target table. We need plan state to be initialized by the executor to * decide whether to allow parallel inserts or not. */ - if (IsParallelInsertInCTASAllowed(into, queryDesc)) + if (IsParallelInsertInCTASAllowed(into, queryDesc, ctas_tuple_cost_flags)) SetCTASParallelInsertState(queryDesc); /* Execute the plan for statistics if asked for */ diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index 4b18be5b27..12227b6e79 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -674,7 +674,8 @@ ExplainExecuteQuery(ExecuteStmt *execstmt, IntoClause *into, ExplainState *es, if (pstmt->commandType != CMD_UTILITY) ExplainOnePlan(pstmt, into, es, query_string, paramLI, queryEnv, - &planduration, (es->buffers ? &bufusage : NULL)); + &planduration, (es->buffers ? &bufusage : NULL), + NULL); else ExplainOneUtility(pstmt->utilityStmt, into, es, query_string, paramLI, queryEnv); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 22d6935824..800f25903d 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -76,6 +76,7 @@ #include "access/amapi.h" #include "access/htup_details.h" #include "access/tsmapi.h" +#include "commands/createas.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "executor/nodeHash.h" @@ -378,6 +379,7 @@ cost_gather(GatherPath *path, PlannerInfo *root, { Cost startup_cost = 0; Cost run_cost = 0; + bool ignore_tuple_cost = false; /* Mark the path with the correct row estimate */ if (rows) @@ -393,7 +395,25 @@ cost_gather(GatherPath *path, PlannerInfo *root, /* Parallel setup and communication cost. */ startup_cost += parallel_setup_cost; - run_cost += parallel_tuple_cost * path->path.rows; + + /* + * Do not consider tuple cost in case of we intend to perform parallel + * inserts by workers. We would have set ignore flag in + * apply_scanjoin_target_to_paths before generating Gather path for the + * upper level SELECT part of the CTAS. + */ + if ((root->parse->CTASParallelInsInfo & CTAS_PARALLEL_INS_SELECT) && + (root->parse->CTASParallelInsInfo & + CTAS_PARALLEL_INS_TUP_COST_CAN_IGN)) + { + ignore_tuple_cost = true; + root->parse->CTASParallelInsInfo &= + ~CTAS_PARALLEL_INS_TUP_COST_CAN_IGN; + root->parse->CTASParallelInsInfo |= CTAS_PARALLEL_INS_TUP_COST_IGNORED; + } + + if (!ignore_tuple_cost) + run_cost += parallel_tuple_cost * path->path.rows; path->path.startup_cost = startup_cost; path->path.total_cost = (startup_cost + run_cost); diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 1a94b58f8b..d287b6bfbb 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -28,6 +28,7 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "commands/createas.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "foreign/fdwapi.h" @@ -7338,6 +7339,45 @@ can_partial_agg(PlannerInfo *root) return true; } +/* + * ignore_parallel_tuple_cost + * + * Gather node will not receive any tuples from the workers in case each worker + * inserts them in parallel. So, we set a flag to ignore parallel tuple cost by + * the Gather path in cost_gather if the SELECT is for CTAS and we are + * generating an upper level Gather path. +*/ +static bool +ignore_parallel_tuple_cost(PlannerInfo *root) +{ + if (root->query_level == 1 && + (root->parse->CTASParallelInsInfo & CTAS_PARALLEL_INS_SELECT)) + { + /* + * In each of following cases, a parent path will be generated for the + * upper Gather path(in grouping_planner), in which case we can not + * let parallel inserts happen. So we do not set ignore tuple cost + * flag. + */ + if (root->parse->rowMarks || + limit_needed(root->parse) || + root->parse->sortClause || + root->parse->distinctClause || + root->parse->hasWindowFuncs || + root->parse->groupClause || + root->parse->groupingSets || + root->parse->hasAggs || + root->hasHavingQual) + return false; + + root->parse->CTASParallelInsInfo |= CTAS_PARALLEL_INS_TUP_COST_CAN_IGN; + + return true; + } + + return false; +} + /* * apply_scanjoin_target_to_paths * @@ -7557,8 +7597,29 @@ apply_scanjoin_target_to_paths(PlannerInfo *root, * one of the generated paths may turn out to be the cheapest one. */ if (rel->consider_parallel && !IS_OTHER_REL(rel)) + { + /* + * Set a flag to ignore parallel tuple cost by the Gather path in + * cost_gather if the SELECT is for CTAS and we are generating an upper + * level Gather path. + */ + bool ignore = ignore_parallel_tuple_cost(root); + generate_useful_gather_paths(root, rel, false); + /* + * Reset the ignore flag, in case we set it but + * generate_useful_gather_paths returned without reaching cost_gather. + */ + if (ignore && + (root->parse->CTASParallelInsInfo & + CTAS_PARALLEL_INS_TUP_COST_CAN_IGN)) + { + root->parse->CTASParallelInsInfo &= + ~CTAS_PARALLEL_INS_TUP_COST_CAN_IGN; + } + } + /* * Reassess which paths are the cheapest, now that we've potentially added * new Gather (or Gather Merge) and/or Append (or MergeAppend) paths to diff --git a/src/include/commands/createas.h b/src/include/commands/createas.h index ab3aab58c5..e01a6152ce 100644 --- a/src/include/commands/createas.h +++ b/src/include/commands/createas.h @@ -39,6 +39,24 @@ typedef struct Oid object_id; } DR_intorel; +/* + * Information sent to the planner from CTAS to account for the cost + * calculations in cost_gather. We need to do this because, no tuples will be + * received by the Gather node if the workers insert the tuples in parallel. + */ +typedef enum CTASParallelInsertOpt +{ + CTAS_PARALLEL_INS_UNDEF = 0, /* undefined */ + CTAS_PARALLEL_INS_SELECT = 1 << 0, /* set to this before planning */ + /* + * Set to this while planning for upper Gather path to ignore parallel + * tuple cost in cost_gather. + */ + CTAS_PARALLEL_INS_TUP_COST_CAN_IGN = 1 << 1, + + CTAS_PARALLEL_INS_TUP_COST_IGNORED = 1 << 2 +} CTASParallelInsertOpt; + #define IS_CTAS(intoclause) (intoclause && IsA(intoclause, IntoClause)) #define IS_PARALLEL_CTAS_DEST(dest) (dest && dest->mydest == DestIntoRel && \ IS_CTAS(((DR_intorel *) dest)->into) && \ @@ -53,7 +71,8 @@ extern int GetIntoRelEFlags(IntoClause *intoClause); extern DestReceiver *CreateIntoRelDestReceiver(IntoClause *intoClause); extern bool IsParallelInsertInCTASAllowed(IntoClause *into, - QueryDesc *queryDesc); + QueryDesc *queryDesc, + uint8 *tuple_cost_flags); extern void SetCTASParallelInsertState(QueryDesc *queryDesc); diff --git a/src/include/commands/explain.h b/src/include/commands/explain.h index ba661d32a6..1a1806dbf1 100644 --- a/src/include/commands/explain.h +++ b/src/include/commands/explain.h @@ -91,7 +91,8 @@ extern void ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, - const BufferUsage *bufusage); + const BufferUsage *bufusage, + uint8 *ctas_tuple_cost_flags); extern void ExplainPrintPlan(ExplainState *es, QueryDesc *queryDesc); extern void ExplainPrintTriggers(ExplainState *es, QueryDesc *queryDesc); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 48a79a7657..81b148c383 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -180,6 +180,7 @@ typedef struct Query */ int stmt_location; /* start location, or -1 if unknown */ int stmt_len; /* length in bytes; 0 means "rest of string" */ + uint8 CTASParallelInsInfo; /* parallel insert in CTAS info */ } Query; -- 2.25.1