From 1a4a12c50d796916824ea728fe4ab4a1f2625c26 Mon Sep 17 00:00:00 2001 From: Greg Nancarrow Date: Fri, 11 Dec 2020 17:19:10 +1100 Subject: [PATCH v11 3/4] Enable parallel INSERT and/or SELECT for "INSERT INTO ... SELECT ...", where it is safe to do so. Parallel INSERT can't be utilized in the following cases: - A parallel query plan can't be generated for the underlying SELECT, because, for example, the SELECT statement uses a parallel-unsafe function - INSERT statement uses the ON CONFLICT DO UPDATE clause - Target table is a foreign or temporary table - Target table has a foreign key, or a parallel-unsafe trigger, index expression, column default expression or check constraint - Target table is a partitioned table with a parallel-unsafe partition key expression or support function Where the above-mentioned target table features are found to be, at worst, parallel-restricted, rather than parallel-unsafe, at least parallel SELECT may be utilized for the INSERT's query plan. Some prior work (85f6b49 and 3ba59cc) has already been done to establish the necessary infrastructure to allow parallel INSERTs, in general, to be safe, except for cases where new commandIds would be generated in the parallel-worker code (such as inserts into a table having a foreign key) - these cases need to be avoided. The planner is updated with additional changes that build upon those made to support parallel SELECT for "INSERT INTO ... SELECT ...". Where Gather paths are normally generated for parallel SELECT, in the case of an "INSERT INTO ... SELECT ...", these Gather paths are now generated only if the parallel-safety level is found to be RESTRICTED (resulting in non-parallel INSERT with parallel SELECT), otherwise if the parallel-safety level is found to be parallel SAFE, then partial paths for parallel INSERT are generated before Gather paths are added (resulting in parallel INSERT+SELECT). The executor is updated for Gather and ModifyTable node processing, to handle parallel INSERT, by only starting tuple queue readers if there is a RETURNING clause, and by firing any before/after statement triggers in the leader (and preventing them from being fired in the workers). The handling of the currentCommandId is updated to set it as used in the leader prior to entering parallel-mode for parallel INSERT, and to record it as used at the start of the parallel INSERT operation in the worker. The parallel-worker framework is updated to support serialization of an INSERT planned statement, to be passed to the workers, and to support return of the number of tuples processed (INSERTed) by the workers, for the executor state. Note that this commit changes the RELATION_IS_LOCAL() macro (typically used to decide whether we can skip acquiring locks), as now a relation created in the current transaction can no longer be assumed to be accessible only to the current backend, as it may be accessible to parallel workers. Discussion: https://postgr.es/m/CAJcOf-cXnB5cnMKqWEp2E2z7Mvcd04iLVmV=qpFJrR3AcrTS3g@mail.gmail.com --- src/backend/access/heap/heapam.c | 30 ++++++- src/backend/access/transam/xact.c | 33 ++++++-- src/backend/executor/execMain.c | 7 +- src/backend/executor/execParallel.c | 65 ++++++++++++++- src/backend/executor/nodeGather.c | 69 +++++++++++++--- src/backend/executor/nodeModifyTable.c | 44 +++++++++- src/backend/optimizer/path/costsize.c | 46 +++++++++++ src/backend/optimizer/plan/createplan.c | 2 +- src/backend/optimizer/plan/planner.c | 140 ++++++++++++++++++++++++++++++-- src/backend/optimizer/plan/setrefs.c | 13 ++- src/backend/optimizer/util/pathnode.c | 56 ++++--------- src/include/access/xact.h | 3 +- src/include/executor/execParallel.h | 1 + src/include/executor/nodeModifyTable.h | 3 +- src/include/nodes/execnodes.h | 3 +- src/include/optimizer/cost.h | 1 + src/include/optimizer/pathnode.h | 3 +- src/include/utils/rel.h | 9 +- 18 files changed, 445 insertions(+), 83 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index a9583f3..53df895 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -52,6 +52,9 @@ #include "access/xloginsert.h" #include "access/xlogutils.h" #include "catalog/catalog.h" +#ifdef USE_ASSERT_CHECKING +#include "commands/trigger.h" +#endif #include "miscadmin.h" #include "pgstat.h" #include "port/atomics.h" @@ -2049,10 +2052,31 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, * inserts in general except for the cases where inserts generate a new * CommandId (eg. inserts into a table having a foreign key column). */ +#ifdef USE_ASSERT_CHECKING if (IsParallelWorker()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot insert tuples in a parallel worker"))); + { + /* + * Assert that for this relation, no trigger of type RI_TRIGGER_FK + * exists, as it would indicate that the relation has a FK column, + * which would, on insert, result in creation of a new CommandId, + * and this isn't currently supported in a parallel worker. + */ + TriggerDesc *trigdesc = relation->trigdesc; + if (trigdesc != NULL) + { + int i; + + for (i = 0; i < trigdesc->numtriggers; i++) + { + int trigtype; + Trigger *trigger = &trigdesc->triggers[i]; + + trigtype = RI_FKey_trigger_type(trigger->tgfoid); + Assert(trigtype != RI_TRIGGER_FK); + } + } + } +#endif tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 0c28a46..bad6a12 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -518,6 +518,20 @@ GetCurrentFullTransactionIdIfAny(void) } /* + * SetCurrentCommandIdUsedForWorker + * + * For a parallel worker, record that the currentCommandId has been used. + * This must only be called at the start of a parallel operation. + */ +void +SetCurrentCommandIdUsedForWorker(void) +{ + Assert(IsParallelWorker() && !currentCommandIdUsed && currentCommandId != InvalidCommandId); + + currentCommandIdUsed = true; +} + +/* * MarkCurrentTransactionIdLoggedIfAny * * Remember that the current xid - if it is assigned - now has been wal logged. @@ -765,12 +779,16 @@ GetCurrentCommandId(bool used) if (used) { /* - * Forbid setting currentCommandIdUsed in a parallel worker, because - * we have no provision for communicating this back to the leader. We - * could relax this restriction when currentCommandIdUsed was already - * true at the start of the parallel operation. + * If in a parallel worker, only allow setting currentCommandIdUsed if + * currentCommandIdUsed was already true at the start of the parallel + * operation (by way of SetCurrentCommandIdUsed()), otherwise forbid + * setting currentCommandIdUsed because we have no provision for + * communicating this back to the leader. Once currentCommandIdUsed is + * set, the commandId used by leader and workers can't be changed, + * because CommandCounterIncrement() then prevents any attempted + * increment of the current commandId. */ - Assert(!IsParallelWorker()); + Assert(!(IsParallelWorker() && !currentCommandIdUsed)); currentCommandIdUsed = true; } return currentCommandId; @@ -1021,12 +1039,15 @@ IsInParallelMode(void) * Prepare for entering parallel mode, based on command-type. */ void -PrepareParallelMode(CmdType commandType) +PrepareParallelMode(CmdType commandType, bool isParallelModifyLeader) { Assert(!IsInParallelMode() || force_parallel_mode != FORCE_PARALLEL_OFF); if (IsModifySupportedInParallelMode(commandType)) { + if (isParallelModifyLeader) + (void) GetCurrentCommandId(true); + /* * Prepare for entering parallel mode by assigning a * FullTransactionId, to be included in the transaction state that is diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 2bb74d4..d277e02 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -790,7 +790,8 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt) PreventCommandIfReadOnly(CreateCommandName((Node *) plannedstmt)); } - if (plannedstmt->commandType != CMD_SELECT || plannedstmt->hasModifyingCTE) + if ((plannedstmt->commandType != CMD_SELECT && + !IsModifySupportedInParallelMode(plannedstmt->commandType)) || plannedstmt->hasModifyingCTE) PreventCommandIfParallelMode(CreateCommandName((Node *) plannedstmt)); } @@ -1527,7 +1528,9 @@ ExecutePlan(EState *estate, estate->es_use_parallel_mode = use_parallel_mode; if (use_parallel_mode) { - PrepareParallelMode(estate->es_plannedstmt->commandType); + bool isParallelModifyLeader = IsA(planstate, GatherState) && IsA(outerPlanState(planstate), ModifyTableState); + + PrepareParallelMode(estate->es_plannedstmt->commandType, isParallelModifyLeader); EnterParallelMode(); } diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index befde52..2f3a75b 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -23,6 +23,7 @@ #include "postgres.h" +#include "access/xact.h" #include "executor/execParallel.h" #include "executor/executor.h" #include "executor/nodeAgg.h" @@ -65,6 +66,7 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_PROCESSED_COUNT UINT64CONST(0xE00000000000000B) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -173,9 +175,11 @@ ExecSerializePlan(Plan *plan, EState *estate) * PlannedStmt to start the executor. */ pstmt = makeNode(PlannedStmt); - pstmt->commandType = CMD_SELECT; + Assert(estate->es_plannedstmt->commandType == CMD_SELECT || + IsModifySupportedInParallelMode(estate->es_plannedstmt->commandType)); + pstmt->commandType = IsA(plan, ModifyTable) ? castNode(ModifyTable, plan)->operation : CMD_SELECT; pstmt->queryId = UINT64CONST(0); - pstmt->hasReturning = false; + pstmt->hasReturning = estate->es_plannedstmt->hasReturning; pstmt->hasModifyingCTE = false; pstmt->canSetTag = true; pstmt->transientPlan = false; @@ -183,7 +187,7 @@ ExecSerializePlan(Plan *plan, EState *estate) pstmt->parallelModeNeeded = false; pstmt->planTree = plan; pstmt->rtable = estate->es_range_table; - pstmt->resultRelations = NIL; + pstmt->resultRelations = estate->es_plannedstmt->resultRelations; pstmt->appendRelations = NIL; /* @@ -590,6 +594,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *paramlistinfo_space; BufferUsage *bufusage_space; WalUsage *walusage_space; + uint64 *processed_count_space; SharedExecutorInstrumentation *instrumentation = NULL; SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; @@ -675,6 +680,14 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + if (IsA(planstate->plan, ModifyTable)) + { + /* Estimate space for returned "# of tuples processed" count. */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(uint64), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + /* * Give parallel-aware nodes a chance to add to the estimates, and get a * count of how many PlanState nodes there are. @@ -764,6 +777,22 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, /* We don't need the TupleQueueReaders yet, though. */ pei->reader = NULL; + if (IsA(planstate->plan, ModifyTable)) + { + /* + * Allocate space for each worker's returned "# of tuples processed" + * count. + */ + processed_count_space = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(uint64), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_PROCESSED_COUNT, processed_count_space); + pei->processed_count = processed_count_space; + } + else + { + pei->processed_count = NULL; + } + /* * If instrumentation options were supplied, allocate space for the data. * It only gets partially initialized here; the rest happens during @@ -1152,6 +1181,15 @@ ExecParallelFinish(ParallelExecutorInfo *pei) for (i = 0; i < nworkers; i++) InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]); + /* + * Update total # of tuples processed, using counts from each worker. + */ + if (pei->processed_count != NULL) + { + for (i = 0; i < nworkers; i++) + pei->planstate->state->es_processed += pei->processed_count[i]; + } + pei->finished = true; } @@ -1379,6 +1417,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) FixedParallelExecutorState *fpes; BufferUsage *buffer_usage; WalUsage *wal_usage; + uint64 *processed_count; DestReceiver *receiver; QueryDesc *queryDesc; SharedExecutorInstrumentation *instrumentation; @@ -1400,6 +1439,16 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) true); queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options); + Assert(queryDesc->operation == CMD_SELECT || IsModifySupportedInParallelMode(queryDesc->operation)); + if (IsModifySupportedInParallelMode(queryDesc->operation)) + { + /* + * Record that the CurrentCommandId is used, at the start of the + * parallel operation. + */ + SetCurrentCommandIdUsedForWorker(); + } + /* Setting debug_query_string for individual workers */ debug_query_string = queryDesc->sourceText; @@ -1458,6 +1507,16 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], &wal_usage[ParallelWorkerNumber]); + if (IsModifySupportedInParallelMode(queryDesc->operation)) + { + /* + * Report the # of tuples processed during execution of a parallel + * table-modification command. + */ + processed_count = shm_toc_lookup(toc, PARALLEL_KEY_PROCESSED_COUNT, false); + processed_count[ParallelWorkerNumber] = queryDesc->estate->es_processed; + } + /* Report instrumentation data if any instrumentation options are set. */ if (instrumentation != NULL) ExecParallelReportInstrumentation(queryDesc->planstate, diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index a01b46a..c249195 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -35,6 +35,7 @@ #include "executor/execdebug.h" #include "executor/execParallel.h" #include "executor/nodeGather.h" +#include "executor/nodeModifyTable.h" #include "executor/nodeSubplan.h" #include "executor/tqueue.h" #include "miscadmin.h" @@ -60,6 +61,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags) GatherState *gatherstate; Plan *outerNode; TupleDesc tupDesc; + Index varno; /* Gather node doesn't have innerPlan node. */ Assert(innerPlan(node) == NULL); @@ -104,7 +106,9 @@ ExecInitGather(Gather *node, EState *estate, int eflags) * Initialize result type and projection. */ ExecInitResultTypeTL(&gatherstate->ps); - ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR); + varno = (IsA(outerNode, ModifyTable) && castNode(ModifyTable, outerNode)->returningLists != NULL) ? + castNode(ModifyTableState, outerPlanState(gatherstate))->resultRelInfo->ri_RangeTableIndex : OUTER_VAR; + ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, varno); /* * Without projections result slot type is not trivially known, see @@ -144,9 +148,19 @@ ExecGather(PlanState *pstate) GatherState *node = castNode(GatherState, pstate); TupleTableSlot *slot; ExprContext *econtext; + ModifyTableState *nodeModifyTableState = NULL; + bool isParallelModifyLeader = false; + bool isParallelModifyWithReturning = false; CHECK_FOR_INTERRUPTS(); + if (IsA(outerPlanState(pstate), ModifyTableState)) + { + nodeModifyTableState = castNode(ModifyTableState, outerPlanState(pstate)); + isParallelModifyLeader = IsModifySupportedInParallelMode(nodeModifyTableState->operation); + isParallelModifyWithReturning = isParallelModifyLeader && nodeModifyTableState->ps.plan->targetlist != NIL; + } + /* * Initialize the parallel context and workers on first execution. We do * this on first execution rather than during node initialization, as it @@ -178,6 +192,16 @@ ExecGather(PlanState *pstate) node->pei, gather->initParam); + if (isParallelModifyLeader) + { + /* + * For a supported parallel table-modification command, if + * there are BEFORE STATEMENT triggers, these must be fired by + * the leader, not by the parallel workers. + */ + fireBSTriggersInLeader(nodeModifyTableState); + } + /* * Register backend workers. We might not get as many as we * requested, or indeed any at all. @@ -188,7 +212,7 @@ ExecGather(PlanState *pstate) node->nworkers_launched = pcxt->nworkers_launched; /* Set up tuple queue readers to read the results. */ - if (pcxt->nworkers_launched > 0) + if (pcxt->nworkers_launched > 0 && !(isParallelModifyLeader && !isParallelModifyWithReturning)) { ExecParallelCreateReaders(node->pei); /* Make a working array showing the active readers */ @@ -200,7 +224,11 @@ ExecGather(PlanState *pstate) } else { - /* No workers? Then never mind. */ + /* + * No workers were launched, or this is a supported parallel + * table-modification command without a RETURNING clause - no + * readers are required. + */ node->nreaders = 0; node->reader = NULL; } @@ -208,7 +236,7 @@ ExecGather(PlanState *pstate) } /* Run plan locally if no workers or enabled and not single-copy. */ - node->need_to_scan_locally = (node->nreaders == 0) + node->need_to_scan_locally = (node->nworkers_launched <= 0) || (!gather->single_copy && parallel_leader_participation); node->initialized = true; } @@ -229,7 +257,7 @@ ExecGather(PlanState *pstate) return NULL; /* If no projection is required, we're done. */ - if (node->ps.ps_ProjInfo == NULL) + if (node->ps.ps_ProjInfo == NULL || isParallelModifyWithReturning) return slot; /* @@ -418,14 +446,35 @@ ExecShutdownGatherWorkers(GatherState *node) void ExecShutdownGather(GatherState *node) { - ExecShutdownGatherWorkers(node); + bool isParallelModifyLeader; - /* Now destroy the parallel context. */ - if (node->pei != NULL) + /* + * If the parallel context has already been destroyed, this function must + * have been previously called, so just return. + */ + if (node->pei == NULL) + return; + + isParallelModifyLeader = IsA(outerPlanState(node), ModifyTableState) && + IsModifySupportedInParallelMode(castNode(ModifyTableState, outerPlanState(node))->operation); + + if (isParallelModifyLeader) { - ExecParallelCleanup(node->pei); - node->pei = NULL; + /* + * For a supported parallel table-modification command, if there are + * AFTER STATEMENT triggers, these must be fired by the leader, not by + * the parallel workers. + */ + ModifyTableState *nodeModifyTableState = castNode(ModifyTableState, outerPlanState(node)); + + fireASTriggersInLeader(nodeModifyTableState); } + + ExecShutdownGatherWorkers(node); + + /* Now destroy the parallel context. */ + ExecParallelCleanup(node->pei); + node->pei = NULL; } /* ---------------------------------------------------------------- diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index e0f2428..58bf07c 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -39,6 +39,7 @@ #include "access/heapam.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "access/tableam.h" #include "access/xact.h" #include "catalog/catalog.h" @@ -1830,6 +1831,39 @@ fireASTriggers(ModifyTableState *node) } /* + * Process BEFORE EACH STATEMENT triggers, in the leader + */ +void +fireBSTriggersInLeader(ModifyTableState *node) +{ + Assert(IsInParallelMode() && !IsParallelWorker()); + + if (node->fireBSTriggers) + { + fireBSTriggers(node); + node->fireBSTriggers = false; + + /* + * Disable firing of AFTER STATEMENT triggers by local plan execution + * (ModifyTable processing). These will be fired at the end of Gather + * processing. + */ + node->fireASTriggers = false; + } +} + +/* + * Process AFTER EACH STATEMENT triggers, in the leader + */ +void +fireASTriggersInLeader(ModifyTableState *node) +{ + Assert(IsInParallelMode() && !IsParallelWorker()); + + fireASTriggers(node); +} + +/* * Set up the state needed for collecting transition tuples for AFTER * triggers. */ @@ -2155,7 +2189,11 @@ ExecModifyTable(PlanState *pstate) /* * We're done, but fire AFTER STATEMENT triggers before exiting. */ - fireASTriggers(node); + if (node->fireASTriggers) + { + fireASTriggers(node); + node->fireASTriggers = false; + } node->mt_done = true; @@ -2232,7 +2270,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) /* set up epqstate with dummy subplan data for the moment */ EvalPlanQualInit(&mtstate->mt_epqstate, estate, NULL, NIL, node->epqParam); - mtstate->fireBSTriggers = true; + /* Statement-level triggers must not be fired by parallel workers */ + mtstate->fireBSTriggers = !IsParallelWorker(); + mtstate->fireASTriggers = !IsParallelWorker(); /* * Build state for collecting transition tuples. This requires having a diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 22d6935..52ecb3a 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -213,6 +213,52 @@ clamp_row_est(double nrows) /* + * cost_modifytable + * Determines and returns the cost of a ModifyTable node. + */ +void +cost_modifytable(ModifyTablePath *path) +{ + double total_size; + ListCell *lc; + + /* + * Compute cost & rowcount as sum of subpath costs & rowcounts. + * + * Currently, we don't charge anything extra for the actual table + * modification work, nor for the WITH CHECK OPTIONS or RETURNING + * expressions if any. + */ + path->path.startup_cost = 0; + path->path.total_cost = 0; + path->path.rows = 0; + total_size = 0; + foreach(lc, path->subpaths) + { + Path *subpath = (Path *) lfirst(lc); + + if (lc == list_head(path->subpaths)) /* first node? */ + path->path.startup_cost = subpath->startup_cost; + path->path.total_cost += subpath->total_cost; + if (path->returningLists != NIL) + { + path->path.rows += subpath->rows; + total_size += subpath->pathtarget->width * subpath->rows; + } + } + + /* + * Set width to the average width of the subpath outputs. XXX this is + * totally wrong: we should return an average of the RETURNING tlist + * widths. But it's what happened historically, and improving it is a + * task for another day. + */ + if (path->path.rows > 0) + total_size /= path->path.rows; + path->path.pathtarget->width = rint(total_size); +} + +/* * cost_seqscan * Determines and returns the cost of scanning a relation sequentially. * diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 40abe6f..b08db29 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -339,7 +339,7 @@ create_plan(PlannerInfo *root, Path *best_path) * top-level tlist seen at execution time. However, ModifyTable plan * nodes don't have a tlist matching the querytree targetlist. */ - if (!IsA(plan, ModifyTable)) + if (!IsA(plan, ModifyTable) && !(IsA(plan, Gather) && IsA(outerPlan(plan), ModifyTable))) apply_tlist_labeling(plan->targetlist, root->processed_tlist); /* diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 00e7388..81d70e1 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -320,12 +320,14 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, * GUCs are set to values that don't permit parallelism, or if * parallel-unsafe functions are present in the query tree. * + * * (Note that we do allow CREATE TABLE AS, INSERT INTO...SELECT, SELECT * INTO, and CREATE MATERIALIZED VIEW to use parallel plans. However, as - * of now, only the leader backend writes into a completely new table. In - * the future, we can extend it to allow workers to write into the table. - * However, to allow parallel updates and deletes, we have to solve other - * problems, especially around combo CIDs.) + * of now, only INSERT INTO...SELECT employs workers to write into the + * table, while for the other cases only the leader backend writes into a + * completely new table. In the future, we can extend it to allow workers + * for more cases. However, to allow parallel updates and deletes, we have + * to solve other problems, especially around combo CIDs.) * * For now, we don't try to use parallel mode if we're running inside a * parallel worker. We might eventually be able to relax this @@ -1805,7 +1807,8 @@ inheritance_planner(PlannerInfo *root) returningLists, rowMarks, NULL, - assign_special_exec_param(root))); + assign_special_exec_param(root), + 0)); } /*-------------------- @@ -1853,6 +1856,7 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, RelOptInfo *final_rel; FinalPathExtraData extra; ListCell *lc; + int parallel_modify_partial_path_count = 0; /* Tweak caller-supplied tuple_fraction if have LIMIT/OFFSET */ if (parse->limitCount || parse->limitOffset) @@ -2378,13 +2382,102 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, returningLists, rowMarks, parse->onConflict, - assign_special_exec_param(root)); + assign_special_exec_param(root), + 0); } /* And shove it into final_rel */ add_path(final_rel, path); } + /* Consider a supported parallel table-modification command */ + if (IsModifySupportedInParallelMode(parse->commandType) && + !inheritance_update && + final_rel->consider_parallel && + parse->rowMarks == NIL) + { + Index rootRelation; + List *withCheckOptionLists; + List *returningLists; + int parallelModifyWorkers; + + /* + * Generate partial paths for the final_rel. Insert all surviving + * paths, with Limit, and/or ModifyTable steps added if needed. + */ + foreach(lc, current_rel->partial_pathlist) + { + Path *path = (Path *) lfirst(lc); + + /* + * If there is a LIMIT/OFFSET clause, add the LIMIT node. + */ + if (limit_needed(parse)) + { + path = (Path *) create_limit_path(root, final_rel, path, + parse->limitOffset, + parse->limitCount, + parse->limitOption, + offset_est, count_est); + } + + /* + * Add the ModifyTable node. + */ + + /* + * If target is a partition root table, we need to mark the + * ModifyTable node appropriately for that. + */ + if (rt_fetch(parse->resultRelation, parse->rtable)->relkind == + RELKIND_PARTITIONED_TABLE) + rootRelation = parse->resultRelation; + else + rootRelation = 0; + + /* + * Set up the WITH CHECK OPTION and RETURNING lists-of-lists, if + * needed. + */ + if (parse->withCheckOptions) + withCheckOptionLists = list_make1(parse->withCheckOptions); + else + withCheckOptionLists = NIL; + + if (parse->returningList) + returningLists = list_make1(parse->returningList); + else + returningLists = NIL; + + /* + * For the number of workers to use for a parallel + * INSERT/UPDATE/DELETE, it seems resonable to use the same number + * of workers as estimated for the underlying query. + */ + parallelModifyWorkers = path->parallel_workers; + + path = (Path *) + create_modifytable_path(root, final_rel, + parse->commandType, + parse->canSetTag, + parse->resultRelation, + rootRelation, + false, + list_make1_int(parse->resultRelation), + list_make1(path), + list_make1(root), + withCheckOptionLists, + returningLists, + root->rowMarks, + parse->onConflict, + assign_special_exec_param(root), + parallelModifyWorkers); + + add_partial_path(final_rel, path); + parallel_modify_partial_path_count++; + } + } + /* * Generate partial paths for final_rel, too, if outer query levels might * be able to make use of them. @@ -2401,6 +2494,13 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, } } + if (parallel_modify_partial_path_count > 0) + { + final_rel->rows = current_rel->rows; /* ??? why hasn't this been + * set above somewhere ???? */ + generate_useful_gather_paths(root, final_rel, false); + } + extra.limit_needed = limit_needed(parse); extra.limit_tuples = limit_tuples; extra.count_est = count_est; @@ -7570,7 +7670,33 @@ apply_scanjoin_target_to_paths(PlannerInfo *root, * one of the generated paths may turn out to be the cheapest one. */ if (rel->consider_parallel && !IS_OTHER_REL(rel)) - generate_useful_gather_paths(root, rel, false); + { + if (IsModifySupportedInParallelMode(root->parse->commandType)) + { + Assert(root->glob->parallelModeOK); + if (root->glob->maxParallelHazard != PROPARALLEL_SAFE) + { + /* + * Don't allow a supported parallel table-modification + * command, because it's not safe. + */ + if (root->glob->maxParallelHazard == PROPARALLEL_RESTRICTED) + { + /* + * However, do allow any underlying query to be run by + * parallel workers. + */ + generate_useful_gather_paths(root, rel, false); + } + rel->partial_pathlist = NIL; + rel->consider_parallel = false; + } + } + else + { + generate_useful_gather_paths(root, rel, false); + } + } /* * Reassess which paths are the cheapest, now that we've potentially added diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 127ea3d..c0e6a62 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -252,6 +252,7 @@ set_plan_references(PlannerInfo *root, Plan *plan) PlannerGlobal *glob = root->glob; int rtoffset = list_length(glob->finalrtable); ListCell *lc; + Plan *finalPlan; /* * Add all the query's RTEs to the flattened rangetable. The live ones @@ -302,7 +303,17 @@ set_plan_references(PlannerInfo *root, Plan *plan) } /* Now fix the Plan tree */ - return set_plan_refs(root, plan, rtoffset); + finalPlan = set_plan_refs(root, plan, rtoffset); + if (finalPlan != NULL && IsA(finalPlan, Gather)) + { + Plan *subplan = outerPlan(finalPlan); + + if (IsA(subplan, ModifyTable) && castNode(ModifyTable, subplan)->returningLists != NULL) + { + finalPlan->targetlist = copyObject(subplan->targetlist); + } + } + return finalPlan; } /* diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 5147895..2b55f7f 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -3528,6 +3528,7 @@ create_lockrows_path(PlannerInfo *root, RelOptInfo *rel, * 'rowMarks' is a list of PlanRowMarks (non-locking only) * 'onconflict' is the ON CONFLICT clause, or NULL * 'epqParam' is the ID of Param for EvalPlanQual re-eval + * 'parallelWorkers' is the no. of parallel workers to use */ ModifyTablePath * create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, @@ -3538,10 +3539,10 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, List *subroots, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, - int epqParam) + int epqParam, + int parallelWorkers) { ModifyTablePath *pathnode = makeNode(ModifyTablePath); - double total_size; ListCell *lc; Assert(list_length(resultRelations) == list_length(subpaths)); @@ -3558,47 +3559,22 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = false; - pathnode->path.parallel_workers = 0; - pathnode->path.pathkeys = NIL; - - /* - * Compute cost & rowcount as sum of subpath costs & rowcounts. - * - * Currently, we don't charge anything extra for the actual table - * modification work, nor for the WITH CHECK OPTIONS or RETURNING - * expressions if any. It would only be window dressing, since - * ModifyTable is always a top-level node and there is no way for the - * costs to change any higher-level planning choices. But we might want - * to make it look better sometime. - */ - pathnode->path.startup_cost = 0; - pathnode->path.total_cost = 0; - pathnode->path.rows = 0; - total_size = 0; - foreach(lc, subpaths) + pathnode->path.parallel_safe = rel->consider_parallel && parallelWorkers > 0; + if (pathnode->path.parallel_safe) { - Path *subpath = (Path *) lfirst(lc); - - if (lc == list_head(subpaths)) /* first node? */ - pathnode->path.startup_cost = subpath->startup_cost; - pathnode->path.total_cost += subpath->total_cost; - if (returningLists != NIL) + foreach(lc, subpaths) { - pathnode->path.rows += subpath->rows; - total_size += subpath->pathtarget->width * subpath->rows; + Path *sp = (Path *) lfirst(lc); + + if (!sp->parallel_safe) + { + pathnode->path.parallel_safe = false; + break; + } } } - - /* - * Set width to the average width of the subpath outputs. XXX this is - * totally wrong: we should return an average of the RETURNING tlist - * widths. But it's what happened historically, and improving it is a task - * for another day. - */ - if (pathnode->path.rows > 0) - total_size /= pathnode->path.rows; - pathnode->path.pathtarget->width = rint(total_size); + pathnode->path.parallel_workers = parallelWorkers; + pathnode->path.pathkeys = NIL; pathnode->operation = operation; pathnode->canSetTag = canSetTag; @@ -3614,6 +3590,8 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, pathnode->onconflict = onconflict; pathnode->epqParam = epqParam; + cost_modifytable(pathnode); + return pathnode; } diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 591672c..8bd3506 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -386,6 +386,7 @@ extern FullTransactionId GetTopFullTransactionId(void); extern FullTransactionId GetTopFullTransactionIdIfAny(void); extern FullTransactionId GetCurrentFullTransactionId(void); extern FullTransactionId GetCurrentFullTransactionIdIfAny(void); +extern void SetCurrentCommandIdUsedForWorker(void); extern void MarkCurrentTransactionIdLoggedIfAny(void); extern bool SubTransactionIsActive(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); @@ -466,7 +467,7 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse extern void EnterParallelMode(void); extern void ExitParallelMode(void); extern bool IsInParallelMode(void); -extern void PrepareParallelMode(CmdType commandType); +extern void PrepareParallelMode(CmdType commandType, bool isParallelModifyLeader); /* * IsModifySupportedInParallelMode diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 5a39a5b..0534544 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -27,6 +27,7 @@ typedef struct ParallelExecutorInfo ParallelContext *pcxt; /* parallel context we're using */ BufferUsage *buffer_usage; /* points to bufusage area in DSM */ WalUsage *wal_usage; /* walusage area in DSM */ + uint64 *processed_count; /* processed tuple count area in DSM */ SharedExecutorInstrumentation *instrumentation; /* optional */ struct SharedJitInstrumentation *jit_instrumentation; /* optional */ dsa_area *area; /* points to DSA area in DSM */ diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h index 46a2dc9..e332482 100644 --- a/src/include/executor/nodeModifyTable.h +++ b/src/include/executor/nodeModifyTable.h @@ -22,5 +22,6 @@ extern void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, extern ModifyTableState *ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags); extern void ExecEndModifyTable(ModifyTableState *node); extern void ExecReScanModifyTable(ModifyTableState *node); - +extern void fireBSTriggersInLeader(ModifyTableState *node); +extern void fireASTriggersInLeader(ModifyTableState *node); #endif /* NODEMODIFYTABLE_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 61ba4c3..e576038 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1164,7 +1164,8 @@ typedef struct ModifyTableState List **mt_arowmarks; /* per-subplan ExecAuxRowMark lists */ EPQState mt_epqstate; /* for evaluating EvalPlanQual rechecks */ - bool fireBSTriggers; /* do we need to fire stmt triggers? */ + bool fireBSTriggers; /* do we need to fire before stmt triggers? */ + bool fireASTriggers; /* do we need to fire after stmt triggers? */ /* * Slot for storing tuples in the root partitioned table's rowtype during diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 8e621d2..a2ae2f6 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -69,6 +69,7 @@ extern PGDLLIMPORT int constraint_exclusion; extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, double index_pages, PlannerInfo *root); +extern void cost_modifytable(ModifyTablePath *path); extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel, ParamPathInfo *param_info); extern void cost_samplescan(Path *path, PlannerInfo *root, RelOptInfo *baserel, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 3bd7072..77011bc 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -264,7 +264,8 @@ extern ModifyTablePath *create_modifytable_path(PlannerInfo *root, List *subroots, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, - int epqParam); + int epqParam, + int parallel_workers); extern LimitPath *create_limit_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, Node *limitOffset, Node *limitCount, diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index c5ffea4..be8f1a0 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -575,15 +575,14 @@ typedef struct ViewOptions /* * RELATION_IS_LOCAL - * If a rel is either temp or newly created in the current transaction, - * it can be assumed to be accessible only to the current backend. - * This is typically used to decide that we can skip acquiring locks. + * If a rel is temp, it can be assumed to be accessible only to the + * current backend. This is typically used to decide that we can + * skip acquiring locks. * * Beware of multiple eval of argument */ #define RELATION_IS_LOCAL(relation) \ - ((relation)->rd_islocaltemp || \ - (relation)->rd_createSubid != InvalidSubTransactionId) + ((relation)->rd_islocaltemp) /* * RELATION_IS_OTHER_TEMP -- 1.8.3.1