From fb643168973e3bbb2fb12a220c93931cde794bb0 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Thu, 30 Nov 2017 01:39:05 +1300 Subject: [PATCH] Make sure es_query_dsa is set correctly in the leader process. Commit e13029a5ce353574516c64fd1ec9c50201e705fd added es_query_dsa as a member of EState to provide a DSA area for use by executor nodes. That works for worker processes where all executor nodes use the same shared memory, but doesn't work correctly in the leader process where there may be more than one Gather or Gather Merge node each with its own DSA area. Repair by installing each Gather or Gather Merge node's DSA area into es_query_dsa only for the duration of appropriate calls into the query tree below. A bigger change adopting better scoping will be studied for version 11. Author: Thomas Munro Reviewed-By: Amit Kapila, Robert Haas Tested-By: Alexander Voytsekhovskyy, Amit Kapila, Andreas Seltenreich Discussion: https://postgr.es/m/CAEepm=1U6as=brnVvMNixEV2tpi8NuyQoTmO8Qef0-VV+=7MDA@mail.gmail.com --- src/backend/executor/execParallel.c | 26 ++++++++++++++------------ src/backend/executor/nodeGather.c | 6 ++++++ src/backend/executor/nodeGatherMerge.c | 4 ++++ 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index d57cdbd4e15..6b6064637b8 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -330,7 +330,7 @@ EstimateParamExecSpace(EState *estate, Bitmapset *params) * parameter array) and then the datum as serialized by datumSerialize(). */ static dsa_pointer -SerializeParamExecParams(EState *estate, Bitmapset *params) +SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area) { Size size; int nparams; @@ -341,8 +341,8 @@ SerializeParamExecParams(EState *estate, Bitmapset *params) /* Allocate enough space for the current parameter values. */ size = EstimateParamExecSpace(estate, params); - handle = dsa_allocate(estate->es_query_dsa, size); - start_address = dsa_get_address(estate->es_query_dsa, handle); + handle = dsa_allocate(area, size); + start_address = dsa_get_address(area, handle); /* First write the number of parameters as a 4-byte integer. */ nparams = bms_num_members(params); @@ -736,12 +736,6 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, LWTRANCHE_PARALLEL_QUERY_DSA, pcxt->seg); - /* - * Make the area available to executor nodes running in the leader. - * See also ParallelQueryMain which makes it available to workers. - */ - estate->es_query_dsa = pei->area; - /* * Serialize parameters, if any, using DSA storage. We don't dare use * the main parallel query DSM for this because we might relaunch @@ -750,7 +744,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, */ if (!bms_is_empty(sendParams)) { - pei->param_exec = SerializeParamExecParams(estate, sendParams); + pei->param_exec = SerializeParamExecParams(estate, sendParams, + pei->area); fpes->param_exec = pei->param_exec; } } @@ -763,7 +758,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, d.pcxt = pcxt; d.instrumentation = instrumentation; d.nnodes = 0; + + /* Install our DSA area while initializing the plan. */ + estate->es_query_dsa = pei->area; ExecParallelInitializeDSM(planstate, &d); + estate->es_query_dsa = NULL; /* * Make sure that the world hasn't shifted under our feet. This could @@ -832,19 +831,22 @@ ExecParallelReinitialize(PlanState *planstate, /* Free any serialized parameters from the last round. */ if (DsaPointerIsValid(fpes->param_exec)) { - dsa_free(estate->es_query_dsa, fpes->param_exec); + dsa_free(pei->area, fpes->param_exec); fpes->param_exec = InvalidDsaPointer; } /* Serialize current parameter values if required. */ if (!bms_is_empty(sendParams)) { - pei->param_exec = SerializeParamExecParams(estate, sendParams); + pei->param_exec = SerializeParamExecParams(estate, sendParams, + pei->area); fpes->param_exec = pei->param_exec; } /* Traverse plan tree and let each child node reset associated state. */ + estate->es_query_dsa = pei->area; ExecParallelReInitializeDSM(planstate, pei->pcxt); + estate->es_query_dsa = NULL; } /* diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index a44cf8409af..1697ae650d7 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -277,7 +277,13 @@ gather_getnext(GatherState *gatherstate) if (gatherstate->need_to_scan_locally) { + EState *estate = gatherstate->ps.state; + + /* Install our DSA area while executing the plan. */ + estate->es_query_dsa = + gatherstate->pei ? gatherstate->pei->area : NULL; outerTupleSlot = ExecProcNode(outerPlan); + estate->es_query_dsa = NULL; if (!TupIsNull(outerTupleSlot)) return outerTupleSlot; diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 4a8a59eabf1..a69777aa951 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -637,8 +637,12 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) { PlanState *outerPlan = outerPlanState(gm_state); TupleTableSlot *outerTupleSlot; + EState *estate = gm_state->ps.state; + /* Install our DSA area while executing the plan. */ + estate->es_query_dsa = gm_state->pei ? gm_state->pei->area : NULL; outerTupleSlot = ExecProcNode(outerPlan); + estate->es_query_dsa = NULL; if (!TupIsNull(outerTupleSlot)) { -- 2.15.0