diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 581a05c..551a689 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -3017,14 +3017,13 @@ heap_delete(Relation relation, ItemPointer tid, Assert(ItemPointerIsValid(tid)); /* - * Forbid this during a parallel operation, lest it allocate a combocid. - * Other workers might need that combocid for visibility checks, and we - * have no provision for broadcasting it to them. + * For now, parallel operations are required to be strictly read-only in + * parallel worker. */ - if (IsInParallelMode()) + if (IsParallelWorker()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot delete tuples during a parallel operation"))); + errmsg("cannot delete tuples in a parallel worker"))); block = ItemPointerGetBlockNumber(tid); buffer = ReadBuffer(relation, block); @@ -3489,14 +3488,13 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, Assert(ItemPointerIsValid(otid)); /* - * Forbid this during a parallel operation, lest it allocate a combocid. - * Other workers might need that combocid for visibility checks, and we - * have no provision for broadcasting it to them. + * For now, parallel operations are required to be strictly read-only in + * parallel worker. */ - if (IsInParallelMode()) + if (IsParallelWorker()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot update tuples during a parallel operation"))); + errmsg("cannot update tuples in a parallel worker"))); /* * Fetch the list of attributes to be checked for HOT update. This is diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 42fc351..8557e29 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -15,6 +15,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/parallel.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/xact.h" @@ -53,8 +54,8 @@ GetNewTransactionId(bool isSubXact) * Workers synchronize transaction state at the beginning of each parallel * operation, so we can't account for new XIDs after that point. */ - if (IsInParallelMode()) - elog(ERROR, "cannot assign TransactionIds during a parallel operation"); + if (IsParallelWorker()) + elog(ERROR, "cannot assign TransactionIds in a parallel worker"); /* * During bootstrap initialization, we return the special bootstrap diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 82f9a3c..611ddbc 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -499,8 +499,8 @@ AssignTransactionId(TransactionState s) * Workers synchronize transaction state at the beginning of each parallel * operation, so we can't account for new XIDs at this point. */ - if (IsInParallelMode() || IsParallelWorker()) - elog(ERROR, "cannot assign XIDs during a parallel operation"); + if (IsParallelWorker()) + elog(ERROR, "cannot assign XIDs in a parallel worker"); /* * Ensure parent(s) have XIDs, so that a child always has an XID later @@ -933,8 +933,8 @@ CommandCounterIncrement(void) * parallel operation, so we can't account for new commands after that * point. */ - if (IsInParallelMode() || IsParallelWorker()) - elog(ERROR, "cannot start commands during a parallel operation"); + if (IsParallelWorker()) + elog(ERROR, "cannot start commands in a parallel worker"); currentCommandId += 1; if (currentCommandId == InvalidCommandId) diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 9567e9a..e0470bd 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -78,6 +78,7 @@ static void InitPlan(QueryDesc *queryDesc, int eflags); static void CheckValidRowMarkRel(Relation rel, RowMarkType markType); static void ExecPostprocessPlan(EState *estate); static void ExecEndPlan(PlanState *planstate, EState *estate); +static bool is_write_parallel_safe(EState *estate); static void ExecutePlan(EState *estate, PlanState *planstate, bool use_parallel_mode, CmdType operation, @@ -1550,6 +1551,62 @@ ExecEndPlan(PlanState *planstate, EState *estate) } } +/* + * is_write_parallel_safe + * Detect whether the given estate contains any write operations that contains + * only parallel-safe functions + * + * This is required to identify earlier before entering paralle mode, + * otherwise it may lead to a failure in case if it involves any parallel + * unsafe operation. + */ +static bool +is_write_parallel_safe(EState *estate) +{ + /* + * check whether the command is of write operation or not? + * before proceeding to verify whether it contains all parallel + * safe functions. + */ + if (estate->es_plannedstmt->commandType == CMD_INSERT || + estate->es_plannedstmt->commandType == CMD_DELETE || + estate->es_plannedstmt->commandType == CMD_UPDATE) + { + int i; + + /* + * Loop through the all result relations and identify whether + * any relations in the query contains any triggers, check constraints + * and indexes with expressions. + * + * FIXME: Not sure whether these are the only area where unsafe functions + * can present that leads to a failure. + * + * FIXME: Need to identify the expressions that contains any unsafe + * functions, instead of rejecting blindly. + */ + for (i = 0; i < estate->es_num_result_relations; i++) + { + int j; + + ResultRelInfo *current_result_relation + = estate->es_result_relations + i; + + if (current_result_relation->ri_TrigDesc != NULL) + return false; + + for (j = 0; j < current_result_relation->ri_NumIndices; j++) + { + IndexInfo *idx_info + = *(current_result_relation->ri_IndexRelationInfo + j); + + if (idx_info->ii_Expressions) + return false; + } + } + } +} + /* ---------------------------------------------------------------- * ExecutePlan * @@ -1587,11 +1644,14 @@ ExecutePlan(EState *estate, /* * If a tuple count was supplied, we must force the plan to run without - * parallelism, because we might exit early. + * parallelism, because we might exit early. And also check whether it + * contains any unsafe functions that are present in write operations + * eligible for parallel mode. */ - if (numberTuples) + if (numberTuples || (use_parallel_mode && !is_write_parallel_safe(estate))) use_parallel_mode = false; + if (use_parallel_mode) EnterParallelMode(); diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index ca0ae78..29f594d 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -242,7 +242,7 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 && IsUnderPostmaster && dynamic_shared_memory_type != DSM_IMPL_NONE && - parse->commandType == CMD_SELECT && + parse->commandType != CMD_UTILITY && !parse->hasModifyingCTE && max_parallel_workers_per_gather > 0 && !IsParallelWorker() && diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index b19380e..d6b6156 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -1129,6 +1129,25 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) if (node == NULL) return false; + /* Loop through all the ListCell items to find out the parallel safety */ + if (IsA(node, List)) + { + ListCell *temp; + + foreach(temp, (List *) node) + { + if (max_parallel_hazard_walker((Node *) lfirst(temp), context)) + return true; + } + } + + /* check for hazardous functions in target expression */ + if (IsA(node, TargetEntry)) + { + if (max_parallel_hazard_walker((Node *)((TargetEntry *)node)->expr, context)) + return true; + } + /* Check for hazardous functions in node itself */ if (check_functions_in_node(node, max_parallel_hazard_checker, context)) diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index f232c84..66403af 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -48,6 +48,7 @@ #include #include +#include "access/parallel.h" #include "access/transam.h" #include "access/xact.h" #include "access/xlog.h" @@ -792,8 +793,8 @@ UpdateActiveSnapshotCommandId(void) */ save_curcid = ActiveSnapshot->as_snap->curcid; curcid = GetCurrentCommandId(false); - if (IsInParallelMode() && save_curcid != curcid) - elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation"); + if (IsParallelWorker() && save_curcid != curcid) + elog(ERROR, "cannot modify commandid in active snapshot in a parallel worker"); ActiveSnapshot->as_snap->curcid = curcid; } diff --git a/src/test/regress/expected/write_parallel.out b/src/test/regress/expected/write_parallel.out index e549cc2..a0f8c7a 100644 --- a/src/test/regress/expected/write_parallel.out +++ b/src/test/regress/expected/write_parallel.out @@ -10,7 +10,7 @@ set parallel_tuple_cost=0; set min_parallel_table_scan_size=0; set max_parallel_workers_per_gather=4; -- --- Test write operations that has an underlying query that is eligble +-- Test utility write operations that has an underlying query that is eligble -- for parallel plans -- explain (costs off) create table parallel_write as @@ -77,4 +77,179 @@ explain (costs off) create table parallel_write as execute prep_stmt; create table parallel_write as execute prep_stmt; drop table parallel_write; +-- +-- Test write operations that has an underlying query that is eligble +-- for parallel plans +-- +create table parallel_test(a int); +explain (costs off) insert into parallel_test (select length(stringu1) from tenk1 group by length(stringu1)); + QUERY PLAN +--------------------------------------------------------------- + Insert on parallel_test + -> Finalize HashAggregate + Group Key: (length((tenk1.stringu1)::text)) + -> Gather + Workers Planned: 4 + -> Partial HashAggregate + Group Key: length((tenk1.stringu1)::text) + -> Parallel Seq Scan on tenk1 +(8 rows) + +insert into parallel_test (select length(stringu1) from tenk1 group by length(stringu1)); +set enable_indexscan to off; +set enable_bitmapscan to off; +explain (costs off) update parallel_test set a = a from tenk1 where hundred > 100; + QUERY PLAN +------------------------------------------------------ + Update on parallel_test + -> Nested Loop + -> Gather + Workers Planned: 4 + -> Parallel Seq Scan on tenk1 + Filter: (hundred > 100) + -> Gather + Workers Planned: 3 + -> Parallel Seq Scan on parallel_test +(9 rows) + +update parallel_test set a = a from tenk1 where hundred > 100; +explain (costs off) delete from tenk1 where hundred > 100; + QUERY PLAN +---------------------------------------- + Delete on tenk1 + -> Gather + Workers Planned: 4 + -> Parallel Seq Scan on tenk1 + Filter: (hundred > 100) +(5 rows) + +delete from tenk1 where hundred > 100; +reset enable_indexscan; +reset enable_bitmapscan; +-- +-- Check the insert operation when the domain contains functions +-- that are parallel restricted. +-- +create function sql_is_distinct_from(anyelement, anyelement) +returns boolean language sql +as 'select $1 is distinct from $2 limit 1'; +create domain inotnull int + check (sql_is_distinct_from(value, null)); +create table dom_table (x inotnull); +explain (costs off) insert into dom_table (select length(stringu1) from tenk1 group by length(stringu1)); + QUERY PLAN +--------------------------------------------------------------------- + Insert on dom_table + -> Subquery Scan on "*SELECT*" + -> Finalize HashAggregate + Group Key: (length((tenk1.stringu1)::text)) + -> Gather + Workers Planned: 4 + -> Partial HashAggregate + Group Key: length((tenk1.stringu1)::text) + -> Parallel Seq Scan on tenk1 +(9 rows) + +insert into dom_table (select length(stringu1) from tenk1 group by length(stringu1)); +drop table dom_table; +drop domain inotnull; +drop function sql_is_distinct_from(anyelement, anyelement); +-- +-- Check the insert operation when the index contains expressions +-- that are parallel unsafe. +-- (No parallel mode - Otherwise fails with "ERROR: cannot execute ANALYZE during a parallel operation") +-- +CREATE TABLE vac (i int); +CREATE FUNCTION test_analyze() RETURNS VOID VOLATILE LANGUAGE SQL + AS 'ANALYZE pg_am'; +CREATE FUNCTION wrap_test_analyze(c INT) RETURNS INT IMMUTABLE LANGUAGE SQL + AS 'SELECT $1 FROM test_analyze()'; +CREATE INDEX ON vac(wrap_test_analyze(i)); +explain (costs off) INSERT INTO vac select length(stringu1) from tenk1 group by length(stringu1); + QUERY PLAN +--------------------------------------------------------------- + Insert on vac + -> Finalize HashAggregate + Group Key: (length((tenk1.stringu1)::text)) + -> Gather + Workers Planned: 4 + -> Partial HashAggregate + Group Key: length((tenk1.stringu1)::text) + -> Parallel Seq Scan on tenk1 +(8 rows) + +INSERT INTO vac select length(stringu1) from tenk1 group by length(stringu1); +drop table vac; +drop function wrap_test_analyze(c INT); +drop function test_analyze(); +-- +-- Check the insert operation when the table contains trigger functions +-- (No parallel mode - Otherwise fails with "ERROR: cannot execute nextval() during a parallel operation") +-- +CREATE TABLE y (a integer); +INSERT INTO y SELECT generate_series(1, 10); +CREATE SEQUENCE y_seq; +CREATE FUNCTION y_trigger() RETURNS trigger AS $$ +declare +count_val integer; +begin + select nextval('y_seq') into count_val; + raise notice 'count: %', count_val; + raise notice 'y_trigger: a = %', new.a; + return new; +end; +$$ LANGUAGE plpgsql; +CREATE TRIGGER y_trig BEFORE INSERT ON y FOR EACH ROW + EXECUTE PROCEDURE y_trigger(); +explain (costs off) insert into y (select length(stringu1) from tenk1 group by length(stringu1)); + QUERY PLAN +--------------------------------------------------------------- + Insert on y + -> Finalize HashAggregate + Group Key: (length((tenk1.stringu1)::text)) + -> Gather + Workers Planned: 4 + -> Partial HashAggregate + Group Key: length((tenk1.stringu1)::text) + -> Parallel Seq Scan on tenk1 +(8 rows) + +insert into y (select length(stringu1) from tenk1 group by length(stringu1)); +NOTICE: count: 1 +NOTICE: y_trigger: a = 6 +drop trigger y_trig on y; +drop function y_trigger(); +drop sequence y_seq; +-- +-- Check the insert operation using CTE +-- (No parallel mode - Otherwise fails in writing in worker) +-- +explain (costs off) WITH t AS ( + INSERT INTO y + (select length(stringu1) from tenk1 group by length(stringu1)) + RETURNING * +) +SELECT * FROM t; + QUERY PLAN +----------------------------------------------------------- + CTE Scan on t + CTE t + -> Insert on y + -> HashAggregate + Group Key: length((tenk1.stringu1)::text) + -> Seq Scan on tenk1 +(6 rows) + +WITH t AS ( + INSERT INTO y + (select length(stringu1) from tenk1 group by length(stringu1)) + RETURNING * +) +SELECT * FROM t; + a +--- + 6 +(1 row) + +drop table y; rollback; diff --git a/src/test/regress/sql/write_parallel.sql b/src/test/regress/sql/write_parallel.sql index 00f9156..9e51a7f 100644 --- a/src/test/regress/sql/write_parallel.sql +++ b/src/test/regress/sql/write_parallel.sql @@ -13,7 +13,7 @@ set min_parallel_table_scan_size=0; set max_parallel_workers_per_gather=4; -- --- Test write operations that has an underlying query that is eligble +-- Test utility write operations that has an underlying query that is eligble -- for parallel plans -- explain (costs off) create table parallel_write as @@ -40,4 +40,102 @@ explain (costs off) create table parallel_write as execute prep_stmt; create table parallel_write as execute prep_stmt; drop table parallel_write; +-- +-- Test write operations that has an underlying query that is eligble +-- for parallel plans +-- +create table parallel_test(a int); +explain (costs off) insert into parallel_test (select length(stringu1) from tenk1 group by length(stringu1)); +insert into parallel_test (select length(stringu1) from tenk1 group by length(stringu1)); + +set enable_indexscan to off; +set enable_bitmapscan to off; + +explain (costs off) update parallel_test set a = a from tenk1 where hundred > 100; +update parallel_test set a = a from tenk1 where hundred > 100; + +explain (costs off) delete from tenk1 where hundred > 100; +delete from tenk1 where hundred > 100; + +reset enable_indexscan; +reset enable_bitmapscan; + +-- +-- Check the insert operation when the domain contains functions +-- that are parallel restricted. +-- +create function sql_is_distinct_from(anyelement, anyelement) +returns boolean language sql +as 'select $1 is distinct from $2 limit 1'; +create domain inotnull int + check (sql_is_distinct_from(value, null)); +create table dom_table (x inotnull); +explain (costs off) insert into dom_table (select length(stringu1) from tenk1 group by length(stringu1)); +insert into dom_table (select length(stringu1) from tenk1 group by length(stringu1)); +drop table dom_table; +drop domain inotnull; +drop function sql_is_distinct_from(anyelement, anyelement); + +-- +-- Check the insert operation when the index contains expressions +-- that are parallel unsafe. +-- (No parallel mode - Otherwise fails with "ERROR: cannot execute ANALYZE during a parallel operation") +-- +CREATE TABLE vac (i int); +CREATE FUNCTION test_analyze() RETURNS VOID VOLATILE LANGUAGE SQL + AS 'ANALYZE pg_am'; +CREATE FUNCTION wrap_test_analyze(c INT) RETURNS INT IMMUTABLE LANGUAGE SQL + AS 'SELECT $1 FROM test_analyze()'; +CREATE INDEX ON vac(wrap_test_analyze(i)); +explain (costs off) INSERT INTO vac select length(stringu1) from tenk1 group by length(stringu1); +INSERT INTO vac select length(stringu1) from tenk1 group by length(stringu1); +drop table vac; +drop function wrap_test_analyze(c INT); +drop function test_analyze(); + +-- +-- Check the insert operation when the table contains trigger functions +-- (No parallel mode - Otherwise fails with "ERROR: cannot execute nextval() during a parallel operation") +-- +CREATE TABLE y (a integer); +INSERT INTO y SELECT generate_series(1, 10); +CREATE SEQUENCE y_seq; +CREATE FUNCTION y_trigger() RETURNS trigger AS $$ +declare +count_val integer; +begin + select nextval('y_seq') into count_val; + raise notice 'count: %', count_val; + raise notice 'y_trigger: a = %', new.a; + return new; +end; +$$ LANGUAGE plpgsql; +CREATE TRIGGER y_trig BEFORE INSERT ON y FOR EACH ROW + EXECUTE PROCEDURE y_trigger(); +explain (costs off) insert into y (select length(stringu1) from tenk1 group by length(stringu1)); +insert into y (select length(stringu1) from tenk1 group by length(stringu1)); +drop trigger y_trig on y; +drop function y_trigger(); +drop sequence y_seq; + +-- +-- Check the insert operation using CTE +-- (No parallel mode - Otherwise fails in writing in worker) +-- +explain (costs off) WITH t AS ( + INSERT INTO y + (select length(stringu1) from tenk1 group by length(stringu1)) + RETURNING * +) +SELECT * FROM t; + +WITH t AS ( + INSERT INTO y + (select length(stringu1) from tenk1 group by length(stringu1)) + RETURNING * +) +SELECT * FROM t; + +drop table y; + rollback;