diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 1585861..94c8507 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2049,11 +2049,6 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, * inserts in general except for the cases where inserts generate a new * CommandId (eg. inserts into a table having a foreign key column). */ - if (IsParallelWorker()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot insert tuples in a parallel worker"))); - tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); tup->t_data->t_infomask |= HEAP_XMAX_INVALID; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index af6afce..8c69931 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -517,6 +517,20 @@ GetCurrentFullTransactionIdIfAny(void) } /* + * SetCurrentCommandIdUsedForWorker + * + * For a parallel worker, record that the currentCommandId has been used. + * This must only be called at the start of a parallel operation. + */ +void +SetCurrentCommandIdUsedForWorker(void) +{ + Assert(IsParallelWorker() && !currentCommandIdUsed && currentCommandId != InvalidCommandId); + + currentCommandIdUsed = true; +} + +/* * MarkCurrentTransactionIdLoggedIfAny * * Remember that the current xid - if it is assigned - now has been wal logged. @@ -764,12 +778,13 @@ GetCurrentCommandId(bool used) if (used) { /* - * Forbid setting currentCommandIdUsed in a parallel worker, because - * we have no provision for communicating this back to the leader. We - * could relax this restriction when currentCommandIdUsed was already - * true at the start of the parallel operation. + * If in a parallel worker, only allow setting currentCommandIdUsed + * if currentCommandIdUsed was already true at the start of the + * parallel operation (by way of SetCurrentCommandIdUsed()), otherwise + * forbid setting currentCommandIdUsed because we have no provision + * for communicating this back to the leader. */ - Assert(!IsParallelWorker()); + Assert(!(IsParallelWorker() && !currentCommandIdUsed)); currentCommandIdUsed = true; } return currentCommandId; diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 2e27e26..0f3bd82 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -173,7 +173,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) * against performing unsafe operations in parallel mode, but this gives a * more user-friendly error message. */ - if ((XactReadOnly || IsInParallelMode()) && + if ((XactReadOnly || (IsInParallelMode() && queryDesc->plannedstmt->commandType != CMD_INSERT)) && !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) ExecCheckXactReadOnly(queryDesc->plannedstmt); diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 382e78f..2d6a200 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -23,6 +23,7 @@ #include "postgres.h" +#include "access/xact.h" #include "executor/execParallel.h" #include "executor/executor.h" #include "executor/nodeAgg.h" @@ -65,6 +66,7 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_PROCESSED_COUNT UINT64CONST(0xE00000000000000B) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -173,18 +175,20 @@ ExecSerializePlan(Plan *plan, EState *estate) * PlannedStmt to start the executor. */ pstmt = makeNode(PlannedStmt); - pstmt->commandType = CMD_SELECT; + Assert(estate->es_plannedstmt->commandType == CMD_SELECT || + estate->es_plannedstmt->commandType == CMD_INSERT); + pstmt->commandType = (plan->type == T_ModifyTable) ? CMD_INSERT : CMD_SELECT; pstmt->queryId = UINT64CONST(0); - pstmt->hasReturning = false; - pstmt->hasModifyingCTE = false; + pstmt->hasReturning = estate->es_plannedstmt->hasReturning; + pstmt->hasModifyingCTE = estate->es_plannedstmt->hasModifyingCTE; pstmt->canSetTag = true; pstmt->transientPlan = false; pstmt->dependsOnRole = false; pstmt->parallelModeNeeded = false; pstmt->planTree = plan; pstmt->rtable = estate->es_range_table; - pstmt->resultRelations = NIL; - pstmt->rootResultRelations = NIL; + pstmt->resultRelations = estate->es_plannedstmt->resultRelations; + pstmt->rootResultRelations = estate->es_plannedstmt->rootResultRelations; pstmt->appendRelations = NIL; /* @@ -591,6 +595,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *paramlistinfo_space; BufferUsage *bufusage_space; WalUsage *walusage_space; + uint64 *processed_count_space; SharedExecutorInstrumentation *instrumentation = NULL; SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; @@ -676,6 +681,14 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + if (planstate->plan->type == T_ModifyTable) + { + /* Estimate space for returned "# of tuples processed" count. */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(uint64), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + /* * Give parallel-aware nodes a chance to add to the estimates, and get a * count of how many PlanState nodes there are. @@ -765,6 +778,19 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, /* We don't need the TupleQueueReaders yet, though. */ pei->reader = NULL; + if (planstate->plan->type == T_ModifyTable) + { + /* Allocate space for each worker's returned "# of tuples processed" count. */ + processed_count_space = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(uint64), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_PROCESSED_COUNT, processed_count_space); + pei->processed_count = processed_count_space; + } + else + { + pei->processed_count = NULL; + } + /* * If instrumentation options were supplied, allocate space for the data. * It only gets partially initialized here; the rest happens during @@ -1153,6 +1179,16 @@ ExecParallelFinish(ParallelExecutorInfo *pei) for (i = 0; i < nworkers; i++) InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]); + /* + * Update total # of tuples processed, using counts from each worker. + * This is currently done only in the case of parallel INSERT. + */ + if (pei->processed_count != NULL) + { + for (i = 0; i < nworkers; i++) + pei->planstate->state->es_processed += pei->processed_count[i]; + } + pei->finished = true; } @@ -1380,6 +1416,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) FixedParallelExecutorState *fpes; BufferUsage *buffer_usage; WalUsage *wal_usage; + uint64 *processed_count; DestReceiver *receiver; QueryDesc *queryDesc; SharedExecutorInstrumentation *instrumentation; @@ -1401,6 +1438,16 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) true); queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options); + Assert(queryDesc->operation == CMD_SELECT || queryDesc->operation == CMD_INSERT); + if (queryDesc->operation == CMD_INSERT) + { + /* + * Record that the CurrentCommandId is used, at the start of + * the parallel operation. + */ + SetCurrentCommandIdUsedForWorker(); + } + /* Setting debug_query_string for individual workers */ debug_query_string = queryDesc->sourceText; @@ -1459,6 +1506,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], &wal_usage[ParallelWorkerNumber]); + if (queryDesc->operation == CMD_INSERT) + { + /* Report the # of tuples processed during parallel INSERT execution. */ + processed_count = shm_toc_lookup(toc, PARALLEL_KEY_PROCESSED_COUNT, false); + processed_count[ParallelWorkerNumber] = queryDesc->estate->es_processed; + } + /* Report instrumentation data if any instrumentation options are set. */ if (instrumentation != NULL) ExecParallelReportInstrumentation(queryDesc->planstate, diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index a01b46a..dd8a2db 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -60,6 +60,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags) GatherState *gatherstate; Plan *outerNode; TupleDesc tupDesc; + Index varno; /* Gather node doesn't have innerPlan node. */ Assert(innerPlan(node) == NULL); @@ -104,7 +105,9 @@ ExecInitGather(Gather *node, EState *estate, int eflags) * Initialize result type and projection. */ ExecInitResultTypeTL(&gatherstate->ps); - ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR); + varno = (outerNode->type == T_ModifyTable && ((ModifyTable *)outerNode)->returningLists != NULL) ? + ((ModifyTableState *)outerPlanState(gatherstate))->resultRelInfo->ri_RangeTableIndex : OUTER_VAR; + ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, varno); /* * Without projections result slot type is not trivially known, see @@ -166,6 +169,29 @@ ExecGather(PlanState *pstate) { ParallelContext *pcxt; + /* For parallel INSERT, assign FullTransactionId and CurrentCommandId, + * to be included in the transaction state that is serialized in the + * parallel DSM. We need to temporarily escape parallel mode in order + * for this to be possible. + * For parallel SELECT (as part of non-parallel INSERT), to avoid an + * attempt on INSERT to acquire the FullTransactionId whilst in + * parallel mode, we similarly assign the FullTransactionId here. + */ + if (outerPlanState(pstate)->type == T_ModifyTableState || + estate->es_plannedstmt->commandType == CMD_INSERT) + { + /* + * Assign FullTransactionId and CurrentCommandId, to be + * included in the transaction state that is serialized in the DSM. + */ + if (outerPlanState(pstate)->type == T_ModifyTableState) + GetCurrentCommandId(true); + Assert(IsInParallelMode()); + ExitParallelMode(); + GetCurrentFullTransactionId(); + EnterParallelMode(); + } + /* Initialize, or re-initialize, shared state needed by workers. */ if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index b399592..7e90df5 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -3903,6 +3903,36 @@ compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages, } /* + * Compute the number of parallel workers that should be used to insert into + * a relation. + * + * "max_workers" is caller's limit on the number of workers. This typically + * comes from a GUC. + */ +int +compute_parallel_insert_worker(RelOptInfo *rel, int max_workers) +{ + int parallel_workers = 0; + + /* + * If the user has set the parallel_workers reloption, use that; otherwise + * select a default number of workers. + */ + if (rel->rel_parallel_workers != -1) + parallel_workers = rel->rel_parallel_workers; + else + { + /* TODO - smarts for computing best no. of workers for insert */ + parallel_workers = max_workers; + } + + /* In no case use more than caller supplied maximum number of workers */ + parallel_workers = Min(parallel_workers, max_workers); + + return parallel_workers; +} + +/* * generate_partitionwise_join_paths * Create paths representing partitionwise join for given partitioned * join relation. diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index cd3716d..f34b4c5 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -179,6 +179,7 @@ static void set_rel_width(PlannerInfo *root, RelOptInfo *rel); static double relation_byte_size(double tuples, int width); static double page_size(double tuples, int width); static double get_parallel_divisor(Path *path); +static double get_modifytable_parallel_divisor(ModifyTablePath *path); /* @@ -203,6 +204,66 @@ clamp_row_est(double nrows) /* + * cost_modifytable + * Determines and returns the cost of a ModifyTable node. + */ +void +cost_modifytable(ModifyTablePath *path) +{ + double total_size; + double total_rows; + ListCell *lc; + + /* + * Compute cost & rowcount as sum of subpath costs & rowcounts. + */ + path->path.startup_cost = 0; + path->path.total_cost = 0; + path->path.rows = 0; + total_size = 0; + total_rows = 0; + foreach(lc, path->subpaths) + { + Path *subpath = (Path *) lfirst(lc); + + if (lc == list_head(path->subpaths)) /* first node? */ + path->path.startup_cost = subpath->startup_cost; + path->path.total_cost += subpath->total_cost; + total_rows += subpath->rows; + total_size += subpath->pathtarget->width * subpath->rows; + } + + /* Adjust costing for parallelism, if used. */ + if (path->path.parallel_workers > 0) + { + double parallel_divisor = get_modifytable_parallel_divisor(path); + + /* The total cost is divided among all the workers. */ + path->path.total_cost /= parallel_divisor; + + /* + * In the case of a parallel plan, the row count needs to represent + * the number of tuples processed per worker. + */ + path->path.rows = clamp_row_est(total_rows / parallel_divisor); + } + else + { + path->path.rows = total_rows; + } + + /* + * Set width to the average width of the subpath outputs. XXX this is + * totally wrong: we should report zero if no RETURNING, else an average + * of the RETURNING tlist widths. But it's what happened historically, + * and improving it is a task for another day. + */ + if (total_rows > 0) + total_size /= total_rows; + path->path.pathtarget->width = rint(total_size); +} + +/* * cost_seqscan * Determines and returns the cost of scanning a relation sequentially. * @@ -5737,6 +5798,29 @@ get_parallel_divisor(Path *path) } /* + * Divisor for ModifyTable (currently only Parallel Insert). + * Estimate the fraction of the work that each worker will do given the + * number of workers budgeted for the path. + * TODO: Needs revising based on further experience. + */ +static double +get_modifytable_parallel_divisor(ModifyTablePath *path) +{ + double parallel_divisor = path->path.parallel_workers; + + if (parallel_leader_participation && path->returningLists != NIL) + { + double leader_contribution; + + leader_contribution = 1.0 - (0.3 * path->path.parallel_workers); + if (leader_contribution > 0) + parallel_divisor += leader_contribution; + } + + return parallel_divisor; +} + +/* * compute_bitmap_pages * * compute number of pages fetched from heap in bitmap heap scan. diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 99278ee..cb9a6ea25 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -339,7 +339,7 @@ create_plan(PlannerInfo *root, Path *best_path) * top-level tlist seen at execution time. However, ModifyTable plan * nodes don't have a tlist matching the querytree targetlist. */ - if (!IsA(plan, ModifyTable)) + if (!IsA(plan, ModifyTable) && !(IsA(plan, Gather) && IsA(plan->lefttree, ModifyTable))) apply_tlist_labeling(plan->targetlist, root->processed_tlist); /* diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 3e2b496..9b0f293 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -337,7 +337,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, */ if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 && IsUnderPostmaster && - parse->commandType == CMD_SELECT && + (parse->commandType == CMD_SELECT || parse->commandType == CMD_INSERT) && !parse->hasModifyingCTE && max_parallel_workers_per_gather > 0 && !IsParallelWorker()) @@ -371,6 +371,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, * parallel-unsafe, or else the query planner itself has a bug. */ glob->parallelModeNeeded = glob->parallelModeOK && + (parse->commandType == CMD_SELECT) && (force_parallel_mode != FORCE_PARALLEL_OFF); /* Determine what fraction of the plan is likely to be scanned */ @@ -425,7 +426,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, * Optionally add a Gather node for testing purposes, provided this is * actually a safe thing to do. */ - if (force_parallel_mode != FORCE_PARALLEL_OFF && top_plan->parallel_safe) + if (force_parallel_mode != FORCE_PARALLEL_OFF && parse->commandType == CMD_SELECT && top_plan->parallel_safe) { Gather *gather = makeNode(Gather); @@ -1797,7 +1798,8 @@ inheritance_planner(PlannerInfo *root) returningLists, rowMarks, NULL, - assign_special_exec_param(root))); + assign_special_exec_param(root), + 0)); } /*-------------------- @@ -1845,6 +1847,7 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, RelOptInfo *final_rel; FinalPathExtraData extra; ListCell *lc; + int parallel_insert_partial_path_count = 0; /* Tweak caller-supplied tuple_fraction if have LIMIT/OFFSET */ if (parse->limitCount || parse->limitOffset) @@ -2381,13 +2384,97 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, returningLists, rowMarks, parse->onConflict, - assign_special_exec_param(root)); + assign_special_exec_param(root), + 0); } /* And shove it into final_rel */ add_path(final_rel, path); } + /* Consider Parallel INSERT */ + if (parse->commandType == CMD_INSERT && + !inheritance_update && + final_rel->consider_parallel && + parse->rowMarks == NIL) + { + Index rootRelation; + List *withCheckOptionLists; + List *returningLists; + int parallelInsertWorkers; + + parallelInsertWorkers = compute_parallel_insert_worker(current_rel, max_parallel_workers_per_gather); + + /* + * Generate partial paths for the final_rel. Insert all surviving paths, with + * Limit, and/or ModifyTable steps added if needed. + */ + foreach(lc, current_rel->partial_pathlist) + { + Path *path = (Path *) lfirst(lc); + + /* + * If there is a LIMIT/OFFSET clause, add the LIMIT node. + */ + if (limit_needed(parse)) + { + path = (Path *) create_limit_path(root, final_rel, path, + parse->limitOffset, + parse->limitCount, + parse->limitOption, + offset_est, count_est); + } + + /* + * Add the ModifyTable node. + */ + + /* + * If target is a partition root table, we need to mark the + * ModifyTable node appropriately for that. + */ + if (rt_fetch(parse->resultRelation, parse->rtable)->relkind == + RELKIND_PARTITIONED_TABLE) + rootRelation = parse->resultRelation; + else + rootRelation = 0; + + /* + * Set up the WITH CHECK OPTION and RETURNING lists-of-lists, if + * needed. + */ + if (parse->withCheckOptions) + withCheckOptionLists = list_make1(parse->withCheckOptions); + else + withCheckOptionLists = NIL; + + if (parse->returningList) + returningLists = list_make1(parse->returningList); + else + returningLists = NIL; + + path = (Path *) + create_modifytable_path(root, final_rel, + parse->commandType, + parse->canSetTag, + parse->resultRelation, + rootRelation, + false, + list_make1_int(parse->resultRelation), + list_make1(path), + list_make1(root), + withCheckOptionLists, + returningLists, + root->rowMarks, + parse->onConflict, + assign_special_exec_param(root), + parallelInsertWorkers); + + add_partial_path(final_rel, path); + parallel_insert_partial_path_count++; + } + } + /* * Generate partial paths for final_rel, too, if outer query levels might * be able to make use of them. @@ -2404,6 +2491,12 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, } } + if (parallel_insert_partial_path_count > 0) + { + final_rel->rows = current_rel->rows; /* ??? why hasn't this been set above somewhere ???? */ + generate_useful_gather_paths(root, final_rel, false); + } + extra.limit_needed = limit_needed(parse); extra.limit_tuples = limit_tuples; extra.count_est = count_est; @@ -7573,7 +7666,43 @@ apply_scanjoin_target_to_paths(PlannerInfo *root, * one of the generated paths may turn out to be the cheapest one. */ if (rel->consider_parallel && !IS_OTHER_REL(rel)) - generate_useful_gather_paths(root, rel, false); + { + if (root->parse->commandType == CMD_INSERT) + { + Relation relation; + bool hasFKs; + RangeTblEntry *rte; + List *cachedFKs; + + /* Check if the target relation has foreign keys; if so, avoid + * creating a parallel Insert plan (because inserting into + * such tables would result in creation of new CommandIds, and + * this isn't supported by parallel workers). + * Similarly, avoid creating a parallel Insert plan if ON + * CONFLICT ... DO UPDATE ... has been specified, because + * parallel UPDATE is not supported. + * However, do allow any underlying query to be run by parallel + * workers in these cases. + */ + + rte = rt_fetch(root->parse->resultRelation, root->parse->rtable); + relation = table_open(rte->relid, NoLock); + cachedFKs = RelationGetFKeyList(relation); + hasFKs = cachedFKs != NIL; + table_close(relation, NoLock); + + if (hasFKs || (root->parse->onConflict != NULL && root->parse->onConflict->action == ONCONFLICT_UPDATE)) + { + generate_useful_gather_paths(root, rel, false); + /* Don't allow parallel insert */ + rel->consider_parallel = false; + } + } + else + { + generate_useful_gather_paths(root, rel, false); + } + } /* * Reassess which paths are the cheapest, now that we've potentially added diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index dd8e2e9..062c5fc 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -252,6 +252,7 @@ set_plan_references(PlannerInfo *root, Plan *plan) PlannerGlobal *glob = root->glob; int rtoffset = list_length(glob->finalrtable); ListCell *lc; + Plan *finalPlan; /* * Add all the query's RTEs to the flattened rangetable. The live ones @@ -302,7 +303,16 @@ set_plan_references(PlannerInfo *root, Plan *plan) } /* Now fix the Plan tree */ - return set_plan_refs(root, plan, rtoffset); + finalPlan = set_plan_refs(root, plan, rtoffset); + if (finalPlan->type == T_Gather || finalPlan->type == T_GatherMerge) + { + Plan *subplan = outerPlan(finalPlan); + if (subplan->type == T_ModifyTable && ((ModifyTable *)subplan)->returningLists != NULL) + { + finalPlan->targetlist = outerPlan(finalPlan)->targetlist; + } + } + return finalPlan; } /* diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index c1fc866..4a9c3fa 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -3538,11 +3538,11 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, List *subroots, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, - int epqParam) + int epqParam, + int parallel_workers) { + ListCell *lc; ModifyTablePath *pathnode = makeNode(ModifyTablePath); - double total_size; - ListCell *lc; Assert(list_length(resultRelations) == list_length(subpaths)); Assert(list_length(resultRelations) == list_length(subroots)); @@ -3557,45 +3557,22 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.pathtarget = rel->reltarget; /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; - pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = false; - pathnode->path.parallel_workers = 0; - pathnode->path.pathkeys = NIL; - - /* - * Compute cost & rowcount as sum of subpath costs & rowcounts. - * - * Currently, we don't charge anything extra for the actual table - * modification work, nor for the WITH CHECK OPTIONS or RETURNING - * expressions if any. It would only be window dressing, since - * ModifyTable is always a top-level node and there is no way for the - * costs to change any higher-level planning choices. But we might want - * to make it look better sometime. - */ - pathnode->path.startup_cost = 0; - pathnode->path.total_cost = 0; - pathnode->path.rows = 0; - total_size = 0; - foreach(lc, subpaths) + pathnode->path.parallel_aware = parallel_workers > 0 ? true : false; + pathnode->path.parallel_safe = rel->consider_parallel; + if (rel->consider_parallel) { - Path *subpath = (Path *) lfirst(lc); - - if (lc == list_head(subpaths)) /* first node? */ - pathnode->path.startup_cost = subpath->startup_cost; - pathnode->path.total_cost += subpath->total_cost; - pathnode->path.rows += subpath->rows; - total_size += subpath->pathtarget->width * subpath->rows; + foreach (lc, subpaths) + { + Path *sp = (Path *)lfirst(lc); + if (!sp->parallel_safe) + { + pathnode->path.parallel_safe = false; + break; + } + } } - - /* - * Set width to the average width of the subpath outputs. XXX this is - * totally wrong: we should report zero if no RETURNING, else an average - * of the RETURNING tlist widths. But it's what happened historically, - * and improving it is a task for another day. - */ - if (pathnode->path.rows > 0) - total_size /= pathnode->path.rows; - pathnode->path.pathtarget->width = rint(total_size); + pathnode->path.parallel_workers = parallel_workers; + pathnode->path.pathkeys = NIL; pathnode->operation = operation; pathnode->canSetTag = canSetTag; @@ -3611,6 +3588,8 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, pathnode->onconflict = onconflict; pathnode->epqParam = epqParam; + cost_modifytable(pathnode); + return pathnode; } diff --git a/src/include/access/xact.h b/src/include/access/xact.h index df1b43a..96295bc 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -385,6 +385,7 @@ extern FullTransactionId GetTopFullTransactionId(void); extern FullTransactionId GetTopFullTransactionIdIfAny(void); extern FullTransactionId GetCurrentFullTransactionId(void); extern FullTransactionId GetCurrentFullTransactionIdIfAny(void); +extern void SetCurrentCommandIdUsedForWorker(void); extern void MarkCurrentTransactionIdLoggedIfAny(void); extern bool SubTransactionIsActive(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 5a39a5b..afb8a57 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -27,6 +27,7 @@ typedef struct ParallelExecutorInfo ParallelContext *pcxt; /* parallel context we're using */ BufferUsage *buffer_usage; /* points to bufusage area in DSM */ WalUsage *wal_usage; /* walusage area in DSM */ + uint64 *processed_count; /* processed tuple count area in DSM */ SharedExecutorInstrumentation *instrumentation; /* optional */ struct SharedJitInstrumentation *jit_instrumentation; /* optional */ dsa_area *area; /* points to DSA area in DSM */ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 6141654..fafa087 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -69,6 +69,7 @@ extern PGDLLIMPORT int constraint_exclusion; extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, double index_pages, PlannerInfo *root); +extern void cost_modifytable(ModifyTablePath *path); extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel, ParamPathInfo *param_info); extern void cost_samplescan(Path *path, PlannerInfo *root, RelOptInfo *baserel, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 715a24a..2d08f0c 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -264,7 +264,8 @@ extern ModifyTablePath *create_modifytable_path(PlannerInfo *root, List *subroots, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, - int epqParam); + int epqParam, + int parallel_workers); extern LimitPath *create_limit_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, Node *limitOffset, Node *limitCount, diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h index 10b6e81..1a01dcf 100644 --- a/src/include/optimizer/paths.h +++ b/src/include/optimizer/paths.h @@ -58,6 +58,8 @@ extern void generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows); extern int compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages, int max_workers); +extern int compute_parallel_insert_worker(RelOptInfo *rel, + int max_workers); extern void create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel, Path *bitmapqual); extern void generate_partitionwise_join_paths(PlannerInfo *root,