doc/src/sgml/custom-scan.sgml | 12 ++++++++++++ doc/src/sgml/fdwhandler.sgml | 13 +++++++++++++ src/backend/executor/execParallel.c | 31 +++++++++++++++++++++++++++++-- src/backend/executor/nodeCustom.c | 9 +++++++++ src/backend/executor/nodeForeignscan.c | 16 ++++++++++++++++ src/include/executor/nodeCustom.h | 2 ++ src/include/executor/nodeForeignscan.h | 2 ++ src/include/foreign/fdwapi.h | 3 +++ src/include/nodes/extensible.h | 2 ++ 9 files changed, 88 insertions(+), 2 deletions(-) diff --git a/doc/src/sgml/custom-scan.sgml b/doc/src/sgml/custom-scan.sgml index 1ca9247..4bd20dd 100644 --- a/doc/src/sgml/custom-scan.sgml +++ b/doc/src/sgml/custom-scan.sgml @@ -340,6 +340,18 @@ void (*InitializeWorkerCustomScan) (CustomScanState *node, +void (*ParallelFinishCustomScan) (CustomScanState *node, + ParallelContext *pcxt); + + Retrieve the custom state after all the worker get finished but prior + to the release of DSM segment. + This callback is optional, and needs only be supplied if this custom + path wants to reference the DSM segment in the master process's context + after the worker's exit. + + + + void (*ExplainCustomScan) (CustomScanState *node, List *ancestors, ExplainState *es); diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml index 0c1db07..9ae006a 100644 --- a/doc/src/sgml/fdwhandler.sgml +++ b/doc/src/sgml/fdwhandler.sgml @@ -1254,6 +1254,19 @@ InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc, This callback is optional, and needs only be supplied if this custom path supports parallel execution. + + + +void +ParallelFinishForeignScan(ForeignScanState *node, ParallelContext *pcxt); + + Retrieve the custom state after all the worker get finished but prior + to the release of DSM segment. + This callback is optional, and needs only be supplied if this custom + path wants to reference the DSM segment in the master process's context + after the worker's exit. + + diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 5aa6f02..3be3090 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -536,8 +536,33 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, } /* + * ExecParallelFinishRecursive - allows FDW/CSP to retrieve its own run-time + * statistics stores in the shared memory segment. + */ +static bool +ExecParallelFinishRecursive(PlanState *planstate, ParallelContext *pcxt) +{ + switch (nodeTag(planstate)) + { + case T_ForeignScanState: + ExecForeignScanParallelFinish((ForeignScanState *) planstate, + pcxt); + break; + case T_CustomScanState: + ExecCustomScanParallelFinish((CustomScanState *) planstate, + pcxt); + break; + default: + break; + } + return planstate_tree_walker(planstate, + ExecParallelFinishRecursive, + pcxt); +} + +/* * Finish parallel execution. We wait for parallel workers to finish, and - * accumulate their buffer usage and instrumentation. + * accumulate their buffer usage, instrumentation and others. */ void ExecParallelFinish(ParallelExecutorInfo *pei) @@ -554,10 +579,12 @@ ExecParallelFinish(ParallelExecutorInfo *pei) for (i = 0; i < pei->pcxt->nworkers_launched; ++i) InstrAccumParallelQuery(&pei->buffer_usage[i]); - /* Finally, accumulate instrumentation, if any. */ + /* Next, accumulate instrumentation, if any. */ if (pei->instrumentation) ExecParallelRetrieveInstrumentation(pei->planstate, pei->instrumentation); + /* Finally, accumulate extension's stuff, if any */ + ExecParallelFinishRecursive(pei->planstate, pei->pcxt); pei->finished = true; } diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c index 322abca..b56e80b 100644 --- a/src/backend/executor/nodeCustom.c +++ b/src/backend/executor/nodeCustom.c @@ -204,3 +204,12 @@ ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc) methods->InitializeWorkerCustomScan(node, toc, coordinate); } } + +void +ExecCustomScanParallelFinish(CustomScanState *node, ParallelContext *pcxt) +{ + const CustomExecMethods *methods = node->methods; + + if (methods->ParallelFinishCustomScan) + methods->ParallelFinishCustomScan(node, pcxt); +} diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index d886aaf..d2cb1f7 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -355,3 +355,19 @@ ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc) fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate); } } + +/* ---------------------------------------------------------------- + * ExecForeignScanParallelFinish + * + * Retrieve FDW's own run-time statistics on the parallel coordication + * information prior to its release. + * ---------------------------------------------------------------- + */ +void +ExecForeignScanParallelFinish(ForeignScanState *node, ParallelContext *pcxt) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->ParallelFinishForeignScan) + fdwroutine->ParallelFinishForeignScan(node, pcxt); +} diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h index 7d16c2b..92ba894 100644 --- a/src/include/executor/nodeCustom.h +++ b/src/include/executor/nodeCustom.h @@ -37,5 +37,7 @@ extern void ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt); extern void ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc); +extern void ExecCustomScanParallelFinish(CustomScanState *node, + ParallelContext *pcxt); #endif /* NODECUSTOM_H */ diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index 0cdec4e..3384a01 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -28,5 +28,7 @@ extern void ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt); extern void ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc); +extern void ExecForeignScanParallelFinish(ForeignScanState *node, + ParallelContext *pcxt); #endif /* NODEFOREIGNSCAN_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index e1b0d0d..1100f6d 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -151,6 +151,8 @@ typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node, typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node, shm_toc *toc, void *coordinate); +typedef void (*ParallelFinishForeignScan_function) (ForeignScanState *node, + ParallelContext *pcxt); typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte); @@ -224,6 +226,7 @@ typedef struct FdwRoutine EstimateDSMForeignScan_function EstimateDSMForeignScan; InitializeDSMForeignScan_function InitializeDSMForeignScan; InitializeWorkerForeignScan_function InitializeWorkerForeignScan; + ParallelFinishForeignScan_function ParallelFinishForeignScan; } FdwRoutine; diff --git a/src/include/nodes/extensible.h b/src/include/nodes/extensible.h index 17afe58..9decbf6 100644 --- a/src/include/nodes/extensible.h +++ b/src/include/nodes/extensible.h @@ -139,6 +139,8 @@ typedef struct CustomExecMethods void (*InitializeWorkerCustomScan) (CustomScanState *node, shm_toc *toc, void *coordinate); + void (*ParallelFinishCustomScan) (CustomScanState *node, + ParallelContext *pcxt); /* Optional: print additional information in EXPLAIN */ void (*ExplainCustomScan) (CustomScanState *node,