diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 558cb08..64ca8d8 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -819,19 +819,6 @@ ExecParallelReinitialize(PlanState *planstate, /* Old workers must already be shut down */ Assert(pei->finished); - /* Clear the instrumentation space from the last round. */ - if (pei->instrumentation) - { - Instrumentation *instrument; - SharedExecutorInstrumentation *sh_instr; - int i; - - sh_instr = pei->instrumentation; - instrument = GetInstrumentationArray(sh_instr); - for (i = 0; i < sh_instr->num_workers * sh_instr->num_plan_nodes; ++i) - InstrInit(&instrument[i], pei->planstate->state->es_instrument); - } - /* Force parameters we're going to pass to workers to be evaluated. */ ExecEvalParamExecParams(sendParams, estate); @@ -910,12 +897,8 @@ ExecParallelReInitializeDSM(PlanState *planstate, pcxt); break; case T_HashState: - /* even when not parallel-aware, for EXPLAIN ANALYZE */ - ExecHashReInitializeDSM((HashState *) planstate, pcxt); - break; case T_SortState: - /* even when not parallel-aware, for EXPLAIN ANALYZE */ - ExecSortReInitializeDSM((SortState *) planstate, pcxt); + /* these nodes have DSM state, but no reinitialization is required */ break; default: @@ -1046,11 +1029,6 @@ ExecParallelFinish(ParallelExecutorInfo *pei) for (i = 0; i < nworkers; i++) InstrAccumParallelQuery(&pei->buffer_usage[i]); - /* Finally, accumulate instrumentation, if any. */ - if (pei->instrumentation) - ExecParallelRetrieveInstrumentation(pei->planstate, - pei->instrumentation); - pei->finished = true; } @@ -1063,6 +1041,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei) void ExecParallelCleanup(ParallelExecutorInfo *pei) { + /* Accumulate instrumentation, if any. */ + if (pei->instrumentation) + ExecParallelRetrieveInstrumentation(pei->planstate, + pei->instrumentation); + /* Free any serialized parameters. */ if (DsaPointerIsValid(pei->param_exec)) { diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 6fe5d69..afd7384 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -1670,19 +1670,6 @@ ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt) } /* - * Reset shared state before beginning a fresh scan. - */ -void -ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt) -{ - if (node->shared_info != NULL) - { - memset(node->shared_info->hinstrument, 0, - node->shared_info->num_workers * sizeof(HashInstrumentation)); - } -} - -/* * Locate the DSM space for hash table instrumentation data that we'll write * to at shutdown time. */ diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c index 73aa371..d593378 100644 --- a/src/backend/executor/nodeSort.c +++ b/src/backend/executor/nodeSort.c @@ -397,23 +397,6 @@ ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt) } /* ---------------------------------------------------------------- - * ExecSortReInitializeDSM - * - * Reset shared state before beginning a fresh scan. - * ---------------------------------------------------------------- - */ -void -ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt) -{ - /* If there's any instrumentation space, clear it for next time */ - if (node->shared_info != NULL) - { - memset(node->shared_info->sinstrument, 0, - node->shared_info->num_workers * sizeof(TuplesortInstrumentation)); - } -} - -/* ---------------------------------------------------------------- * ExecSortInitializeWorker * * Attach worker to DSM space for sort statistics. diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 75d4c70..0974f1e 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -52,7 +52,6 @@ extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue); extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt); extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt); extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt); -extern void ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt); extern void ExecHashRetrieveInstrumentation(HashState *node); extern void ExecShutdownHash(HashState *node); extern void ExecHashGetInstrumentation(HashInstrumentation *instrument, diff --git a/src/include/executor/nodeSort.h b/src/include/executor/nodeSort.h index cc61a9d..627a04c 100644 --- a/src/include/executor/nodeSort.h +++ b/src/include/executor/nodeSort.h @@ -26,7 +26,6 @@ extern void ExecReScanSort(SortState *node); /* parallel instrumentation support */ extern void ExecSortEstimate(SortState *node, ParallelContext *pcxt); extern void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt); -extern void ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt); extern void ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt); extern void ExecSortRetrieveInstrumentation(SortState *node); diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index ff00d47..5323d67 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -487,12 +487,47 @@ explain (analyze, timing off, summary off, costs off) (11 rows) alter table tenk2 reset (parallel_workers); +reset work_mem; +create function explain_parallel_sort_stats() returns setof text +language plpgsql as +$$ +declare ln text; +begin + for ln in + explain (analyze, timing off, summary off, costs off) + select * from + (select ten from tenk1 where ten < 100 order by ten) ss + right join (values (1),(2),(3)) v(x) on true + loop + ln := regexp_replace(ln, 'Memory: \S*', 'Memory: xxx'); + return next ln; + end loop; +end; +$$; +select * from explain_parallel_sort_stats(); + explain_parallel_sort_stats +-------------------------------------------------------------------------- + Nested Loop Left Join (actual rows=30000 loops=1) + -> Values Scan on "*VALUES*" (actual rows=3 loops=1) + -> Gather Merge (actual rows=10000 loops=3) + Workers Planned: 4 + Workers Launched: 4 + -> Sort (actual rows=2000 loops=15) + Sort Key: tenk1.ten + Sort Method: quicksort Memory: xxx + Worker 0: Sort Method: quicksort Memory: xxx + Worker 1: Sort Method: quicksort Memory: xxx + Worker 2: Sort Method: quicksort Memory: xxx + Worker 3: Sort Method: quicksort Memory: xxx + -> Parallel Seq Scan on tenk1 (actual rows=2000 loops=15) + Filter: (ten < 100) +(14 rows) + reset enable_indexscan; reset enable_hashjoin; reset enable_mergejoin; reset enable_material; reset effective_io_concurrency; -reset work_mem; drop table bmscantest; -- test parallel merge join path. set enable_hashjoin to off; diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index 1035d04..c1b2d9a 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -187,12 +187,30 @@ explain (analyze, timing off, summary off, costs off) and tenk2.thousand=0; alter table tenk2 reset (parallel_workers); +reset work_mem; +create function explain_parallel_sort_stats() returns setof text +language plpgsql as +$$ +declare ln text; +begin + for ln in + explain (analyze, timing off, summary off, costs off) + select * from + (select ten from tenk1 where ten < 100 order by ten) ss + right join (values (1),(2),(3)) v(x) on true + loop + ln := regexp_replace(ln, 'Memory: \S*', 'Memory: xxx'); + return next ln; + end loop; +end; +$$; +select * from explain_parallel_sort_stats(); + reset enable_indexscan; reset enable_hashjoin; reset enable_mergejoin; reset enable_material; reset effective_io_concurrency; -reset work_mem; drop table bmscantest; -- test parallel merge join path.