From ced3307f27f01e657499ae6ef4436efaa5e350e5 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi Date: Tue, 15 May 2018 20:21:32 +0900 Subject: [PATCH v3 2/3] infrastructure for asynchronous execution This patch add an infrastructure for asynchronous execution. As a PoC this makes only Append capable to handle asynchronously executable subnodes. --- src/backend/commands/explain.c | 17 ++ src/backend/executor/Makefile | 1 + src/backend/executor/execAsync.c | 152 +++++++++++ src/backend/executor/nodeAppend.c | 342 ++++++++++++++++++++---- src/backend/executor/nodeForeignscan.c | 21 ++ src/backend/nodes/bitmapset.c | 72 +++++ src/backend/nodes/copyfuncs.c | 3 + src/backend/nodes/outfuncs.c | 3 + src/backend/nodes/readfuncs.c | 3 + src/backend/optimizer/plan/createplan.c | 66 ++++- src/backend/postmaster/pgstat.c | 3 + src/backend/postmaster/syslogger.c | 2 +- src/backend/utils/adt/ruleutils.c | 8 +- src/backend/utils/resowner/resowner.c | 4 +- src/include/executor/execAsync.h | 22 ++ src/include/executor/executor.h | 1 + src/include/executor/nodeForeignscan.h | 3 + src/include/foreign/fdwapi.h | 11 + src/include/nodes/bitmapset.h | 1 + src/include/nodes/execnodes.h | 23 +- src/include/nodes/plannodes.h | 9 + src/include/pgstat.h | 3 +- 22 files changed, 705 insertions(+), 65 deletions(-) create mode 100644 src/backend/executor/execAsync.c create mode 100644 src/include/executor/execAsync.h diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index efd7201d61..708e9ed546 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -86,6 +86,7 @@ static void show_incremental_sort_keys(IncrementalSortState *incrsortstate, List *ancestors, ExplainState *es); static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors, ExplainState *es); +static void show_append_info(AppendState *astate, ExplainState *es); static void show_agg_keys(AggState *astate, List *ancestors, ExplainState *es); static void show_grouping_sets(PlanState *planstate, Agg *agg, @@ -1389,6 +1390,8 @@ ExplainNode(PlanState *planstate, List *ancestors, } if (plan->parallel_aware) appendStringInfoString(es->str, "Parallel "); + if (plan->async_capable) + appendStringInfoString(es->str, "Async "); appendStringInfoString(es->str, pname); es->indent++; } @@ -1969,6 +1972,11 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_Hash: show_hash_info(castNode(HashState, planstate), es); break; + + case T_Append: + show_append_info(castNode(AppendState, planstate), es); + break; + default: break; } @@ -2322,6 +2330,15 @@ show_merge_append_keys(MergeAppendState *mstate, List *ancestors, ancestors, es); } +static void +show_append_info(AppendState *astate, ExplainState *es) +{ + Append *plan = (Append *) astate->ps.plan; + + if (plan->nasyncplans > 0) + ExplainPropertyInteger("Async subplans", "", plan->nasyncplans, es); +} + /* * Show the grouping keys for an Agg node. */ diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index f990c6473a..1004647d4f 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ execAmi.o \ + execAsync.o \ execCurrent.o \ execExpr.o \ execExprInterp.o \ diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c new file mode 100644 index 0000000000..2b7d1877e0 --- /dev/null +++ b/src/backend/executor/execAsync.c @@ -0,0 +1,152 @@ +/*------------------------------------------------------------------------- + * + * execAsync.c + * Support routines for asynchronous execution. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/execAsync.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "executor/execAsync.h" +#include "executor/nodeAppend.h" +#include "executor/nodeForeignscan.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + +/* + * ExecAsyncConfigureWait: Add wait event to the WaitEventSet if needed. + * + * If reinit is true, the caller didn't reuse existing WaitEventSet. + */ +bool +ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node, + void *data, bool reinit) +{ + switch (nodeTag(node)) + { + case T_ForeignScanState: + return ExecForeignAsyncConfigureWait((ForeignScanState *)node, + wes, data, reinit); + break; + default: + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(node)); + } +} + +/* + * struct for memory context callback argument used in ExecAsyncEventWait + */ +typedef struct { + int **p_refind; + int *p_refindsize; +} ExecAsync_mcbarg; + +/* + * callback function to reset static variables pointing to the memory in + * TopTransactionContext in ExecAsyncEventWait. + */ +static void ExecAsyncMemoryContextCallback(void *arg) +{ + /* arg is the address of the variable refind in ExecAsyncEventWait */ + ExecAsync_mcbarg *mcbarg = (ExecAsync_mcbarg *) arg; + *mcbarg->p_refind = NULL; + *mcbarg->p_refindsize = 0; +} + +#define EVENT_BUFFER_SIZE 16 + +/* + * ExecAsyncEventWait: + * + * Wait for async events to fire. Returns the Bitmapset of fired events. + */ +Bitmapset * +ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes, long timeout) +{ + static int *refind = NULL; + static int refindsize = 0; + WaitEventSet *wes; + WaitEvent occurred_event[EVENT_BUFFER_SIZE]; + int noccurred = 0; + Bitmapset *fired_events = NULL; + int i; + int n; + + n = bms_num_members(waitnodes); + wes = CreateWaitEventSet(TopTransactionContext, + TopTransactionResourceOwner, n); + if (refindsize < n) + { + if (refindsize == 0) + refindsize = EVENT_BUFFER_SIZE; /* XXX */ + while (refindsize < n) + refindsize *= 2; + if (refind) + refind = (int *) repalloc(refind, refindsize * sizeof(int)); + else + { + static ExecAsync_mcbarg mcb_arg = + { &refind, &refindsize }; + static MemoryContextCallback mcb = + { ExecAsyncMemoryContextCallback, (void *)&mcb_arg, NULL }; + MemoryContext oldctxt = + MemoryContextSwitchTo(TopTransactionContext); + + /* + * refind points to a memory block in + * TopTransactionContext. Register a callback to reset it. + */ + MemoryContextRegisterResetCallback(TopTransactionContext, &mcb); + refind = (int *) palloc(refindsize * sizeof(int)); + MemoryContextSwitchTo(oldctxt); + } + } + + /* Prepare WaitEventSet for waiting on the waitnodes. */ + n = 0; + for (i = bms_next_member(waitnodes, -1) ; i >= 0 ; + i = bms_next_member(waitnodes, i)) + { + refind[i] = i; + if (ExecAsyncConfigureWait(wes, nodes[i], refind + i, true)) + n++; + } + + /* Return immediately if no node to wait. */ + if (n == 0) + { + FreeWaitEventSet(wes); + return NULL; + } + + noccurred = WaitEventSetWait(wes, timeout, occurred_event, + EVENT_BUFFER_SIZE, + WAIT_EVENT_ASYNC_WAIT); + FreeWaitEventSet(wes); + if (noccurred == 0) + return NULL; + + for (i = 0 ; i < noccurred ; i++) + { + WaitEvent *w = &occurred_event[i]; + + if ((w->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0) + { + int n = *(int*)w->user_data; + + fired_events = bms_add_member(fired_events, n); + } + } + + return fired_events; +} diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 88919e62fa..60c36ee048 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -60,6 +60,7 @@ #include "executor/execdebug.h" #include "executor/execPartition.h" #include "executor/nodeAppend.h" +#include "executor/execAsync.h" #include "miscadmin.h" /* Shared state for parallel-aware Append. */ @@ -80,6 +81,7 @@ struct ParallelAppendState #define INVALID_SUBPLAN_INDEX -1 static TupleTableSlot *ExecAppend(PlanState *pstate); +static TupleTableSlot *ExecAppendAsync(PlanState *pstate); static bool choose_next_subplan_locally(AppendState *node); static bool choose_next_subplan_for_leader(AppendState *node); static bool choose_next_subplan_for_worker(AppendState *node); @@ -103,22 +105,22 @@ ExecInitAppend(Append *node, EState *estate, int eflags) PlanState **appendplanstates; Bitmapset *validsubplans; int nplans; + int nasyncplans; int firstvalid; int i, j; /* check for unsupported flags */ - Assert(!(eflags & EXEC_FLAG_MARK)); + Assert(!(eflags & (EXEC_FLAG_MARK | EXEC_FLAG_ASYNC))); /* * create new AppendState for our append node */ appendstate->ps.plan = (Plan *) node; appendstate->ps.state = estate; - appendstate->ps.ExecProcNode = ExecAppend; /* Let choose_next_subplan_* function handle setting the first subplan */ - appendstate->as_whichplan = INVALID_SUBPLAN_INDEX; + appendstate->as_whichsyncplan = INVALID_SUBPLAN_INDEX; /* If run-time partition pruning is enabled, then set that up now */ if (node->part_prune_info != NULL) @@ -152,11 +154,12 @@ ExecInitAppend(Append *node, EState *estate, int eflags) /* * When no run-time pruning is required and there's at least one - * subplan, we can fill as_valid_subplans immediately, preventing + * subplan, we can fill as_valid_syncsubplans immediately, preventing * later calls to ExecFindMatchingSubPlans. */ if (!prunestate->do_exec_prune && nplans > 0) - appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1); + appendstate->as_valid_syncsubplans = + bms_add_range(NULL, node->nasyncplans, nplans - 1); } else { @@ -167,8 +170,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags) * subplans as valid; they must also all be initialized. */ Assert(nplans > 0); - appendstate->as_valid_subplans = validsubplans = - bms_add_range(NULL, 0, nplans - 1); + validsubplans = bms_add_range(NULL, 0, nplans - 1); + appendstate->as_valid_syncsubplans = + bms_add_range(NULL, node->nasyncplans, nplans - 1); appendstate->as_prune_state = NULL; } @@ -192,10 +196,20 @@ ExecInitAppend(Append *node, EState *estate, int eflags) */ j = 0; firstvalid = nplans; + nasyncplans = 0; + i = -1; while ((i = bms_next_member(validsubplans, i)) >= 0) { Plan *initNode = (Plan *) list_nth(node->appendplans, i); + int sub_eflags = eflags; + + /* Let async-capable subplans run asynchronously */ + if (i < node->nasyncplans) + { + sub_eflags |= EXEC_FLAG_ASYNC; + nasyncplans++; + } /* * Record the lowest appendplans index which is a valid partial plan. @@ -203,13 +217,46 @@ ExecInitAppend(Append *node, EState *estate, int eflags) if (i >= node->first_partial_plan && j < firstvalid) firstvalid = j; - appendplanstates[j++] = ExecInitNode(initNode, estate, eflags); + appendplanstates[j++] = ExecInitNode(initNode, estate, sub_eflags); } appendstate->as_first_partial_plan = firstvalid; appendstate->appendplans = appendplanstates; appendstate->as_nplans = nplans; + /* fill in async stuff */ + appendstate->as_nasyncplans = nasyncplans; + appendstate->as_syncdone = (nasyncplans == nplans); + appendstate->as_exec_prune = false; + + /* choose appropriate version of Exec function */ + if (appendstate->as_nasyncplans == 0) + appendstate->ps.ExecProcNode = ExecAppend; + else + appendstate->ps.ExecProcNode = ExecAppendAsync; + + if (appendstate->as_nasyncplans) + { + appendstate->as_asyncresult = (TupleTableSlot **) + palloc0(appendstate->as_nasyncplans * sizeof(TupleTableSlot *)); + + /* initially, all async requests need a request */ + appendstate->as_needrequest = + bms_add_range(NULL, 0, appendstate->as_nasyncplans - 1); + + /* + * ExecAppendAsync needs as_valid_syncsubplans to handle async + * subnodes. + */ + if (appendstate->as_prune_state != NULL && + appendstate->as_prune_state->do_exec_prune) + { + Assert(appendstate->as_valid_syncsubplans == NULL); + + appendstate->as_exec_prune = true; + } + } + /* * Miscellaneous initialization */ @@ -233,7 +280,7 @@ ExecAppend(PlanState *pstate) { AppendState *node = castNode(AppendState, pstate); - if (node->as_whichplan < 0) + if (node->as_whichsyncplan < 0) { /* Nothing to do if there are no subplans */ if (node->as_nplans == 0) @@ -243,11 +290,13 @@ ExecAppend(PlanState *pstate) * If no subplan has been chosen, we must choose one before * proceeding. */ - if (node->as_whichplan == INVALID_SUBPLAN_INDEX && + if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX && !node->choose_next_subplan(node)) return ExecClearTuple(node->ps.ps_ResultTupleSlot); } + Assert(node->as_nasyncplans == 0); + for (;;) { PlanState *subnode; @@ -258,8 +307,9 @@ ExecAppend(PlanState *pstate) /* * figure out which subplan we are currently processing */ - Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans); - subnode = node->appendplans[node->as_whichplan]; + Assert(node->as_whichsyncplan >= 0 && + node->as_whichsyncplan < node->as_nplans); + subnode = node->appendplans[node->as_whichsyncplan]; /* * get a tuple from the subplan @@ -282,6 +332,172 @@ ExecAppend(PlanState *pstate) } } +static TupleTableSlot * +ExecAppendAsync(PlanState *pstate) +{ + AppendState *node = castNode(AppendState, pstate); + Bitmapset *needrequest; + int i; + + Assert(node->as_nasyncplans > 0); + +restart: + if (node->as_nasyncresult > 0) + { + --node->as_nasyncresult; + return node->as_asyncresult[node->as_nasyncresult]; + } + + if (node->as_exec_prune) + { + Bitmapset *valid_subplans = + ExecFindMatchingSubPlans(node->as_prune_state); + + /* Distribute valid subplans into sync and async */ + node->as_needrequest = + bms_intersect(node->as_needrequest, valid_subplans); + node->as_valid_syncsubplans = + bms_difference(valid_subplans, node->as_needrequest); + + node->as_exec_prune = false; + } + + needrequest = node->as_needrequest; + node->as_needrequest = NULL; + while ((i = bms_first_member(needrequest)) >= 0) + { + TupleTableSlot *slot; + PlanState *subnode = node->appendplans[i]; + + slot = ExecProcNode(subnode); + if (subnode->asyncstate == AS_AVAILABLE) + { + if (!TupIsNull(slot)) + { + node->as_asyncresult[node->as_nasyncresult++] = slot; + node->as_needrequest = bms_add_member(node->as_needrequest, i); + } + } + else + node->as_pending_async = bms_add_member(node->as_pending_async, i); + } + bms_free(needrequest); + + for (;;) + { + TupleTableSlot *result; + + /* return now if a result is available */ + if (node->as_nasyncresult > 0) + { + --node->as_nasyncresult; + return node->as_asyncresult[node->as_nasyncresult]; + } + + while (!bms_is_empty(node->as_pending_async)) + { + /* Don't wait for async nodes if any sync node exists. */ + long timeout = node->as_syncdone ? -1 : 0; + Bitmapset *fired; + int i; + + fired = ExecAsyncEventWait(node->appendplans, + node->as_pending_async, + timeout); + + if (bms_is_empty(fired) && node->as_syncdone) + { + /* + * We come here when all the subnodes had fired before + * waiting. Retry fetching from the nodes. + */ + node->as_needrequest = node->as_pending_async; + node->as_pending_async = NULL; + goto restart; + } + + while ((i = bms_first_member(fired)) >= 0) + { + TupleTableSlot *slot; + PlanState *subnode = node->appendplans[i]; + slot = ExecProcNode(subnode); + + Assert(subnode->asyncstate == AS_AVAILABLE); + + if (!TupIsNull(slot)) + { + node->as_asyncresult[node->as_nasyncresult++] = slot; + node->as_needrequest = + bms_add_member(node->as_needrequest, i); + } + + node->as_pending_async = + bms_del_member(node->as_pending_async, i); + } + bms_free(fired); + + /* return now if a result is available */ + if (node->as_nasyncresult > 0) + { + --node->as_nasyncresult; + return node->as_asyncresult[node->as_nasyncresult]; + } + + if (!node->as_syncdone) + break; + } + + /* + * If there is no asynchronous activity still pending and the + * synchronous activity is also complete, we're totally done scanning + * this node. Otherwise, we're done with the asynchronous stuff but + * must continue scanning the synchronous children. + */ + + if (!node->as_syncdone && + node->as_whichsyncplan == INVALID_SUBPLAN_INDEX) + node->as_syncdone = !node->choose_next_subplan(node); + + if (node->as_syncdone) + { + Assert(bms_is_empty(node->as_pending_async)); + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + } + + /* + * get a tuple from the subplan + */ + result = ExecProcNode(node->appendplans[node->as_whichsyncplan]); + + if (!TupIsNull(result)) + { + /* + * If the subplan gave us something then return it as-is. We do + * NOT make use of the result slot that was set up in + * ExecInitAppend; there's no need for it. + */ + return result; + } + + /* + * Go on to the "next" subplan. If no more subplans, return the empty + * slot set up for us by ExecInitAppend, unless there are async plans + * we have yet to finish. + */ + if (!node->choose_next_subplan(node)) + { + node->as_syncdone = true; + if (bms_is_empty(node->as_pending_async)) + { + Assert(bms_is_empty(node->as_needrequest)); + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + } + } + + /* Else loop back and try to get a tuple from the new subplan */ + } +} + /* ---------------------------------------------------------------- * ExecEndAppend * @@ -324,10 +540,18 @@ ExecReScanAppend(AppendState *node) bms_overlap(node->ps.chgParam, node->as_prune_state->execparamids)) { - bms_free(node->as_valid_subplans); - node->as_valid_subplans = NULL; + bms_free(node->as_valid_syncsubplans); + node->as_valid_syncsubplans = NULL; } + /* Reset async state. */ + for (i = 0; i < node->as_nasyncplans; ++i) + ExecShutdownNode(node->appendplans[i]); + + node->as_nasyncresult = 0; + node->as_needrequest = bms_add_range(NULL, 0, node->as_nasyncplans - 1); + node->as_syncdone = (node->as_nasyncplans == node->as_nplans); + for (i = 0; i < node->as_nplans; i++) { PlanState *subnode = node->appendplans[i]; @@ -348,7 +572,7 @@ ExecReScanAppend(AppendState *node) } /* Let choose_next_subplan_* function handle setting the first subplan */ - node->as_whichplan = INVALID_SUBPLAN_INDEX; + node->as_whichsyncplan = INVALID_SUBPLAN_INDEX; } /* ---------------------------------------------------------------- @@ -436,7 +660,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt) static bool choose_next_subplan_locally(AppendState *node) { - int whichplan = node->as_whichplan; + int whichplan = node->as_whichsyncplan; int nextplan; /* We should never be called when there are no subplans */ @@ -451,10 +675,18 @@ choose_next_subplan_locally(AppendState *node) */ if (whichplan == INVALID_SUBPLAN_INDEX) { - if (node->as_valid_subplans == NULL) - node->as_valid_subplans = + /* Shouldn't have an active async node */ + Assert(bms_is_empty(node->as_needrequest)); + + if (node->as_valid_syncsubplans == NULL) + node->as_valid_syncsubplans = ExecFindMatchingSubPlans(node->as_prune_state); + /* Exclude async plans */ + if (node->as_nasyncplans > 0) + bms_del_range(node->as_valid_syncsubplans, + 0, node->as_nasyncplans - 1); + whichplan = -1; } @@ -462,14 +694,14 @@ choose_next_subplan_locally(AppendState *node) Assert(whichplan >= -1 && whichplan <= node->as_nplans); if (ScanDirectionIsForward(node->ps.state->es_direction)) - nextplan = bms_next_member(node->as_valid_subplans, whichplan); + nextplan = bms_next_member(node->as_valid_syncsubplans, whichplan); else - nextplan = bms_prev_member(node->as_valid_subplans, whichplan); + nextplan = bms_prev_member(node->as_valid_syncsubplans, whichplan); if (nextplan < 0) return false; - node->as_whichplan = nextplan; + node->as_whichsyncplan = nextplan; return true; } @@ -490,29 +722,29 @@ choose_next_subplan_for_leader(AppendState *node) /* Backward scan is not supported by parallel-aware plans */ Assert(ScanDirectionIsForward(node->ps.state->es_direction)); - /* We should never be called when there are no subplans */ - Assert(node->as_nplans > 0); + /* We should never be called when there are no sync subplans */ + Assert(node->as_nplans > node->as_nasyncplans); LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); - if (node->as_whichplan != INVALID_SUBPLAN_INDEX) + if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX) { /* Mark just-completed subplan as finished. */ - node->as_pstate->pa_finished[node->as_whichplan] = true; + node->as_pstate->pa_finished[node->as_whichsyncplan] = true; } else { /* Start with last subplan. */ - node->as_whichplan = node->as_nplans - 1; + node->as_whichsyncplan = node->as_nplans - 1; /* * If we've yet to determine the valid subplans then do so now. If * run-time pruning is disabled then the valid subplans will always be * set to all subplans. */ - if (node->as_valid_subplans == NULL) + if (node->as_valid_syncsubplans == NULL) { - node->as_valid_subplans = + node->as_valid_syncsubplans = ExecFindMatchingSubPlans(node->as_prune_state); /* @@ -524,26 +756,26 @@ choose_next_subplan_for_leader(AppendState *node) } /* Loop until we find a subplan to execute. */ - while (pstate->pa_finished[node->as_whichplan]) + while (pstate->pa_finished[node->as_whichsyncplan]) { - if (node->as_whichplan == 0) + if (node->as_whichsyncplan == 0) { pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; - node->as_whichplan = INVALID_SUBPLAN_INDEX; + node->as_whichsyncplan = INVALID_SUBPLAN_INDEX; LWLockRelease(&pstate->pa_lock); return false; } /* - * We needn't pay attention to as_valid_subplans here as all invalid + * We needn't pay attention to as_valid_syncsubplans here as all invalid * plans have been marked as finished. */ - node->as_whichplan--; + node->as_whichsyncplan--; } /* If non-partial, immediately mark as finished. */ - if (node->as_whichplan < node->as_first_partial_plan) - node->as_pstate->pa_finished[node->as_whichplan] = true; + if (node->as_whichsyncplan < node->as_first_partial_plan) + node->as_pstate->pa_finished[node->as_whichsyncplan] = true; LWLockRelease(&pstate->pa_lock); @@ -571,23 +803,23 @@ choose_next_subplan_for_worker(AppendState *node) /* Backward scan is not supported by parallel-aware plans */ Assert(ScanDirectionIsForward(node->ps.state->es_direction)); - /* We should never be called when there are no subplans */ - Assert(node->as_nplans > 0); + /* We should never be called when there are no sync subplans */ + Assert(node->as_nplans > node->as_nasyncplans); LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); /* Mark just-completed subplan as finished. */ - if (node->as_whichplan != INVALID_SUBPLAN_INDEX) - node->as_pstate->pa_finished[node->as_whichplan] = true; + if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX) + node->as_pstate->pa_finished[node->as_whichsyncplan] = true; /* * If we've yet to determine the valid subplans then do so now. If * run-time pruning is disabled then the valid subplans will always be set * to all subplans. */ - else if (node->as_valid_subplans == NULL) + else if (node->as_valid_syncsubplans == NULL) { - node->as_valid_subplans = + node->as_valid_syncsubplans = ExecFindMatchingSubPlans(node->as_prune_state); mark_invalid_subplans_as_finished(node); } @@ -600,30 +832,30 @@ choose_next_subplan_for_worker(AppendState *node) } /* Save the plan from which we are starting the search. */ - node->as_whichplan = pstate->pa_next_plan; + node->as_whichsyncplan = pstate->pa_next_plan; /* Loop until we find a valid subplan to execute. */ while (pstate->pa_finished[pstate->pa_next_plan]) { int nextplan; - nextplan = bms_next_member(node->as_valid_subplans, + nextplan = bms_next_member(node->as_valid_syncsubplans, pstate->pa_next_plan); if (nextplan >= 0) { /* Advance to the next valid plan. */ pstate->pa_next_plan = nextplan; } - else if (node->as_whichplan > node->as_first_partial_plan) + else if (node->as_whichsyncplan > node->as_first_partial_plan) { /* * Try looping back to the first valid partial plan, if there is * one. If there isn't, arrange to bail out below. */ - nextplan = bms_next_member(node->as_valid_subplans, + nextplan = bms_next_member(node->as_valid_syncsubplans, node->as_first_partial_plan - 1); pstate->pa_next_plan = - nextplan < 0 ? node->as_whichplan : nextplan; + nextplan < 0 ? node->as_whichsyncplan : nextplan; } else { @@ -631,10 +863,10 @@ choose_next_subplan_for_worker(AppendState *node) * At last plan, and either there are no partial plans or we've * tried them all. Arrange to bail out. */ - pstate->pa_next_plan = node->as_whichplan; + pstate->pa_next_plan = node->as_whichsyncplan; } - if (pstate->pa_next_plan == node->as_whichplan) + if (pstate->pa_next_plan == node->as_whichsyncplan) { /* We've tried everything! */ pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; @@ -644,8 +876,8 @@ choose_next_subplan_for_worker(AppendState *node) } /* Pick the plan we found, and advance pa_next_plan one more time. */ - node->as_whichplan = pstate->pa_next_plan; - pstate->pa_next_plan = bms_next_member(node->as_valid_subplans, + node->as_whichsyncplan = pstate->pa_next_plan; + pstate->pa_next_plan = bms_next_member(node->as_valid_syncsubplans, pstate->pa_next_plan); /* @@ -654,7 +886,7 @@ choose_next_subplan_for_worker(AppendState *node) */ if (pstate->pa_next_plan < 0) { - int nextplan = bms_next_member(node->as_valid_subplans, + int nextplan = bms_next_member(node->as_valid_syncsubplans, node->as_first_partial_plan - 1); if (nextplan >= 0) @@ -671,8 +903,8 @@ choose_next_subplan_for_worker(AppendState *node) } /* If non-partial, immediately mark as finished. */ - if (node->as_whichplan < node->as_first_partial_plan) - node->as_pstate->pa_finished[node->as_whichplan] = true; + if (node->as_whichsyncplan < node->as_first_partial_plan) + node->as_pstate->pa_finished[node->as_whichsyncplan] = true; LWLockRelease(&pstate->pa_lock); @@ -699,13 +931,13 @@ mark_invalid_subplans_as_finished(AppendState *node) Assert(node->as_prune_state); /* Nothing to do if all plans are valid */ - if (bms_num_members(node->as_valid_subplans) == node->as_nplans) + if (bms_num_members(node->as_valid_syncsubplans) == node->as_nplans) return; /* Mark all non-valid plans as finished */ for (i = 0; i < node->as_nplans; i++) { - if (!bms_is_member(i, node->as_valid_subplans)) + if (!bms_is_member(i, node->as_valid_syncsubplans)) node->as_pstate->pa_finished[i] = true; } } diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 513471ab9b..3bf4aaa63d 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -141,6 +141,10 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) scanstate->ss.ps.plan = (Plan *) node; scanstate->ss.ps.state = estate; scanstate->ss.ps.ExecProcNode = ExecForeignScan; + scanstate->ss.ps.asyncstate = AS_AVAILABLE; + + if ((eflags & EXEC_FLAG_ASYNC) != 0) + scanstate->fs_async = true; /* * Miscellaneous initialization @@ -384,3 +388,20 @@ ExecShutdownForeignScan(ForeignScanState *node) if (fdwroutine->ShutdownForeignScan) fdwroutine->ShutdownForeignScan(node); } + +/* ---------------------------------------------------------------- + * ExecAsyncForeignScanConfigureWait + * + * In async mode, configure for a wait + * ---------------------------------------------------------------- + */ +bool +ExecForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes, + void *caller_data, bool reinit) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + Assert(fdwroutine->ForeignAsyncConfigureWait != NULL); + return fdwroutine->ForeignAsyncConfigureWait(node, wes, + caller_data, reinit); +} diff --git a/src/backend/nodes/bitmapset.c b/src/backend/nodes/bitmapset.c index 2719ea45a3..05b625783b 100644 --- a/src/backend/nodes/bitmapset.c +++ b/src/backend/nodes/bitmapset.c @@ -895,6 +895,78 @@ bms_add_range(Bitmapset *a, int lower, int upper) return a; } +/* + * bms_del_range + * Delete members in the range of 'lower' to 'upper' from the set. + * + * Note this could also be done by calling bms_del_member in a loop, however, + * using this function will be faster when the range is large as we work at + * the bitmapword level rather than at bit level. + */ +Bitmapset * +bms_del_range(Bitmapset *a, int lower, int upper) +{ + int lwordnum, + lbitnum, + uwordnum, + ushiftbits, + wordnum; + + if (lower < 0 || upper < 0) + elog(ERROR, "negative bitmapset member not allowed"); + if (lower > upper) + elog(ERROR, "lower range must not be above upper range"); + uwordnum = WORDNUM(upper); + + if (a == NULL) + { + a = (Bitmapset *) palloc0(BITMAPSET_SIZE(uwordnum + 1)); + a->nwords = uwordnum + 1; + } + + /* ensure we have enough words to store the upper bit */ + else if (uwordnum >= a->nwords) + { + int oldnwords = a->nwords; + int i; + + a = (Bitmapset *) repalloc(a, BITMAPSET_SIZE(uwordnum + 1)); + a->nwords = uwordnum + 1; + /* zero out the enlarged portion */ + for (i = oldnwords; i < a->nwords; i++) + a->words[i] = 0; + } + + wordnum = lwordnum = WORDNUM(lower); + + lbitnum = BITNUM(lower); + ushiftbits = BITNUM(upper) + 1; + + /* + * Special case when lwordnum is the same as uwordnum we must perform the + * upper and lower masking on the word. + */ + if (lwordnum == uwordnum) + { + a->words[lwordnum] &= ((bitmapword) (((bitmapword) 1 << lbitnum) - 1) + | (~(bitmapword) 0) << ushiftbits); + } + else + { + /* turn off lbitnum and all bits left of it */ + a->words[wordnum++] &= (bitmapword) (((bitmapword) 1 << lbitnum) - 1); + + /* turn off all bits for any intermediate words */ + while (wordnum < uwordnum) + a->words[wordnum++] = (bitmapword) 0; + + /* turn off upper's bit and all bits right of it. */ + a->words[uwordnum] &= (~(bitmapword) 0) << ushiftbits; + } + + return a; +} + /* * bms_int_members - like bms_intersect, but left input is recycled */ diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index d8cf87e6d0..89a49e2fdc 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -121,6 +121,7 @@ CopyPlanFields(const Plan *from, Plan *newnode) COPY_SCALAR_FIELD(plan_width); COPY_SCALAR_FIELD(parallel_aware); COPY_SCALAR_FIELD(parallel_safe); + COPY_SCALAR_FIELD(async_capable); COPY_SCALAR_FIELD(plan_node_id); COPY_NODE_FIELD(targetlist); COPY_NODE_FIELD(qual); @@ -246,6 +247,8 @@ _copyAppend(const Append *from) COPY_NODE_FIELD(appendplans); COPY_SCALAR_FIELD(first_partial_plan); COPY_NODE_FIELD(part_prune_info); + COPY_SCALAR_FIELD(nasyncplans); + COPY_SCALAR_FIELD(referent); return newnode; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index e2f177515d..d4bb44b268 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -334,6 +334,7 @@ _outPlanInfo(StringInfo str, const Plan *node) WRITE_INT_FIELD(plan_width); WRITE_BOOL_FIELD(parallel_aware); WRITE_BOOL_FIELD(parallel_safe); + WRITE_BOOL_FIELD(async_capable); WRITE_INT_FIELD(plan_node_id); WRITE_NODE_FIELD(targetlist); WRITE_NODE_FIELD(qual); @@ -436,6 +437,8 @@ _outAppend(StringInfo str, const Append *node) WRITE_NODE_FIELD(appendplans); WRITE_INT_FIELD(first_partial_plan); WRITE_NODE_FIELD(part_prune_info); + WRITE_INT_FIELD(nasyncplans); + WRITE_INT_FIELD(referent); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 42050ab719..63af7c02d8 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1572,6 +1572,7 @@ ReadCommonPlan(Plan *local_node) READ_INT_FIELD(plan_width); READ_BOOL_FIELD(parallel_aware); READ_BOOL_FIELD(parallel_safe); + READ_BOOL_FIELD(async_capable); READ_INT_FIELD(plan_node_id); READ_NODE_FIELD(targetlist); READ_NODE_FIELD(qual); @@ -1672,6 +1673,8 @@ _readAppend(void) READ_NODE_FIELD(appendplans); READ_INT_FIELD(first_partial_plan); READ_NODE_FIELD(part_prune_info); + READ_INT_FIELD(nasyncplans); + READ_INT_FIELD(referent); READ_DONE(); } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 744eed187d..ba18dd88a8 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -300,6 +300,7 @@ static ModifyTable *make_modifytable(PlannerInfo *root, List *rowMarks, OnConflictExpr *onconflict, int epqParam); static GatherMerge *create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path); +static bool is_async_capable_path(Path *path); /* @@ -1082,6 +1083,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) bool tlist_was_changed = false; List *pathkeys = best_path->path.pathkeys; List *subplans = NIL; + List *asyncplans = NIL; + List *syncplans = NIL; + List *asyncpaths = NIL; + List *syncpaths = NIL; + List *newsubpaths = NIL; ListCell *subpaths; RelOptInfo *rel = best_path->path.parent; PartitionPruneInfo *partpruneinfo = NULL; @@ -1090,6 +1096,9 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) Oid *nodeSortOperators = NULL; Oid *nodeCollations = NULL; bool *nodeNullsFirst = NULL; + int nasyncplans = 0; + bool first = true; + bool referent_is_sync = true; /* * The subpaths list could be empty, if every child was proven empty by @@ -1219,9 +1228,36 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) } } - subplans = lappend(subplans, subplan); + /* + * Classify as async-capable or not. If we have decided to run the + * children in parallel, we cannot any one of them run asynchronously. + */ + if (!best_path->path.parallel_safe && is_async_capable_path(subpath)) + { + subplan->async_capable = true; + asyncplans = lappend(asyncplans, subplan); + asyncpaths = lappend(asyncpaths, subpath); + ++nasyncplans; + if (first) + referent_is_sync = false; + } + else + { + syncplans = lappend(syncplans, subplan); + syncpaths = lappend(syncpaths, subpath); + } + + first = false; } + /* + * subplan contains asyncplans in the first half, if any, and sync plans in + * another half, if any. We need that the same for subpaths to make + * partition pruning information in sync with subplans. + */ + subplans = list_concat(asyncplans, syncplans); + newsubpaths = list_concat(asyncpaths, syncpaths); + /* * If any quals exist, they may be useful to perform further partition * pruning during execution. Gather information needed by the executor to @@ -1249,7 +1285,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) if (prunequal != NIL) partpruneinfo = make_partition_pruneinfo(root, rel, - best_path->subpaths, + newsubpaths, best_path->partitioned_rels, prunequal); } @@ -1257,6 +1293,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) plan->appendplans = subplans; plan->first_partial_plan = best_path->first_partial_path; plan->part_prune_info = partpruneinfo; + plan->nasyncplans = nasyncplans; + plan->referent = referent_is_sync ? nasyncplans : 0; copy_generic_path_info(&plan->plan, (Path *) best_path); @@ -7016,3 +7054,27 @@ is_projection_capable_plan(Plan *plan) } return true; } + +/* + * is_projection_capable_path + * Check whether a given Path node is async-capable. + */ +static bool +is_async_capable_path(Path *path) +{ + switch (nodeTag(path)) + { + case T_ForeignPath: + { + FdwRoutine *fdwroutine = path->parent->fdwroutine; + + Assert(fdwroutine != NULL); + if (fdwroutine->IsForeignPathAsyncCapable != NULL && + fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path)) + return true; + } + default: + break; + } + return false; +} diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index d7f99d9944..79a2562454 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3882,6 +3882,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_XACT_GROUP_UPDATE: event_name = "XactGroupUpdate"; break; + case WAIT_EVENT_ASYNC_WAIT: + event_name = "AsyncExecWait"; + break; /* no default case, so that compiler will warn */ } diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c index ffcb54968f..a4de6d90e2 100644 --- a/src/backend/postmaster/syslogger.c +++ b/src/backend/postmaster/syslogger.c @@ -300,7 +300,7 @@ SysLoggerMain(int argc, char *argv[]) * syslog pipe, which implies that all other backends have exited * (including the postmaster). */ - wes = CreateWaitEventSet(CurrentMemoryContext, 2); + wes = CreateWaitEventSet(CurrentMemoryContext, NULL, 2); AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); #ifndef WIN32 AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL); diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 076c3c019f..f7b5587d7f 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -4584,10 +4584,14 @@ set_deparse_plan(deparse_namespace *dpns, Plan *plan) * tlists according to one of the children, and the first one is the most * natural choice. Likewise special-case ModifyTable to pretend that the * first child plan is the OUTER referent; this is to support RETURNING - * lists containing references to non-target relations. + * lists containing references to non-target relations. For Append, use the + * explicitly specified referent. */ if (IsA(plan, Append)) - dpns->outer_plan = linitial(((Append *) plan)->appendplans); + { + Append *app = (Append *) plan; + dpns->outer_plan = list_nth(app->appendplans, app->referent); + } else if (IsA(plan, MergeAppend)) dpns->outer_plan = linitial(((MergeAppend *) plan)->mergeplans); else if (IsA(plan, ModifyTable)) diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c index 237ca9fa30..27742a1641 100644 --- a/src/backend/utils/resowner/resowner.c +++ b/src/backend/utils/resowner/resowner.c @@ -1416,7 +1416,7 @@ void ResourceOwnerForgetWES(ResourceOwner owner, WaitEventSet *events) { /* - * XXXX: There's no property to show as an identier of a wait event set, + * XXXX: There's no property to show as an identifier of a wait event set, * use its pointer instead. */ if (!ResourceArrayRemove(&(owner->wesarr), PointerGetDatum(events))) @@ -1431,7 +1431,7 @@ static void PrintWESLeakWarning(WaitEventSet *events) { /* - * XXXX: There's no property to show as an identier of a wait event set, + * XXXX: There's no property to show as an identifier of a wait event set, * use its pointer instead. */ elog(WARNING, "wait event set leak: %p still referenced", diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h new file mode 100644 index 0000000000..3b6bf4a516 --- /dev/null +++ b/src/include/executor/execAsync.h @@ -0,0 +1,22 @@ +/*-------------------------------------------------------------------- + * execAsync.c + * Support functions for asynchronous query execution + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/execAsync.c + *-------------------------------------------------------------------- + */ +#ifndef EXECASYNC_H +#define EXECASYNC_H + +#include "nodes/execnodes.h" +#include "storage/latch.h" + +extern bool ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node, + void *data, bool reinit); +extern Bitmapset *ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes, + long timeout); +#endif /* EXECASYNC_H */ diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index c7deeac662..aca9e2bddd 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -59,6 +59,7 @@ #define EXEC_FLAG_MARK 0x0008 /* need mark/restore */ #define EXEC_FLAG_SKIP_TRIGGERS 0x0010 /* skip AfterTrigger calls */ #define EXEC_FLAG_WITH_NO_DATA 0x0020 /* rel scannability doesn't matter */ +#define EXEC_FLAG_ASYNC 0x0040 /* request async execution */ /* Hook for plugins to get control in ExecutorStart() */ diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index 326d713ebf..71a233b41f 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -30,5 +30,8 @@ extern void ExecForeignScanReInitializeDSM(ForeignScanState *node, extern void ExecForeignScanInitializeWorker(ForeignScanState *node, ParallelWorkerContext *pwcxt); extern void ExecShutdownForeignScan(ForeignScanState *node); +extern bool ExecForeignAsyncConfigureWait(ForeignScanState *node, + WaitEventSet *wes, + void *caller_data, bool reinit); #endif /* NODEFOREIGNSCAN_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 95556dfb15..853ba2b5ad 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -169,6 +169,11 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root, typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root, List *fdw_private, RelOptInfo *child_rel); +typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path); +typedef bool (*ForeignAsyncConfigureWait_function) (ForeignScanState *node, + WaitEventSet *wes, + void *caller_data, + bool reinit); /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler @@ -190,6 +195,7 @@ typedef struct FdwRoutine GetForeignPlan_function GetForeignPlan; BeginForeignScan_function BeginForeignScan; IterateForeignScan_function IterateForeignScan; + IterateForeignScan_function IterateForeignScanAsync; ReScanForeignScan_function ReScanForeignScan; EndForeignScan_function EndForeignScan; @@ -242,6 +248,11 @@ typedef struct FdwRoutine InitializeDSMForeignScan_function InitializeDSMForeignScan; ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan; InitializeWorkerForeignScan_function InitializeWorkerForeignScan; + + /* Support functions for asynchronous execution */ + IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable; + ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait; + ShutdownForeignScan_function ShutdownForeignScan; /* Support functions for path reparameterization. */ diff --git a/src/include/nodes/bitmapset.h b/src/include/nodes/bitmapset.h index d113c271ee..177e6218cb 100644 --- a/src/include/nodes/bitmapset.h +++ b/src/include/nodes/bitmapset.h @@ -107,6 +107,7 @@ extern Bitmapset *bms_add_members(Bitmapset *a, const Bitmapset *b); extern Bitmapset *bms_add_range(Bitmapset *a, int lower, int upper); extern Bitmapset *bms_int_members(Bitmapset *a, const Bitmapset *b); extern Bitmapset *bms_del_members(Bitmapset *a, const Bitmapset *b); +extern Bitmapset *bms_del_range(Bitmapset *a, int lower, int upper); extern Bitmapset *bms_join(Bitmapset *a, Bitmapset *b); /* support for iterating through the integer elements of a set: */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 98e0072b8a..cd50494c74 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -938,6 +938,12 @@ typedef TupleTableSlot *(*ExecProcNodeMtd) (struct PlanState *pstate); * abstract superclass for all PlanState-type nodes. * ---------------- */ +typedef enum AsyncState +{ + AS_AVAILABLE, + AS_WAITING +} AsyncState; + typedef struct PlanState { NodeTag type; @@ -1026,6 +1032,11 @@ typedef struct PlanState bool outeropsset; bool inneropsset; bool resultopsset; + + /* Async subnode execution stuff */ + AsyncState asyncstate; + + int32 padding; /* to keep alignment of derived types */ } PlanState; /* ---------------- @@ -1221,14 +1232,21 @@ struct AppendState PlanState ps; /* its first field is NodeTag */ PlanState **appendplans; /* array of PlanStates for my inputs */ int as_nplans; - int as_whichplan; + int as_whichsyncplan; /* which sync plan is being executed */ int as_first_partial_plan; /* Index of 'appendplans' containing * the first partial plan */ + int as_nasyncplans; /* # of async-capable children */ ParallelAppendState *as_pstate; /* parallel coordination info */ Size pstate_len; /* size of parallel coordination info */ struct PartitionPruneState *as_prune_state; - Bitmapset *as_valid_subplans; + Bitmapset *as_valid_syncsubplans; bool (*choose_next_subplan) (AppendState *); + bool as_syncdone; /* all synchronous plans done? */ + Bitmapset *as_needrequest; /* async plans needing a new request */ + Bitmapset *as_pending_async; /* pending async plans */ + TupleTableSlot **as_asyncresult; /* results of each async plan */ + int as_nasyncresult; /* # of valid entries in as_asyncresult */ + bool as_exec_prune; /* runtime pruning needed for async exec? */ }; /* ---------------- @@ -1796,6 +1814,7 @@ typedef struct ForeignScanState Size pscan_len; /* size of parallel coordination information */ /* use struct pointer to avoid including fdwapi.h here */ struct FdwRoutine *fdwroutine; + bool fs_async; void *fdw_state; /* foreign-data wrapper can keep state here */ } ForeignScanState; diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 83e01074ed..abad89b327 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -135,6 +135,11 @@ typedef struct Plan bool parallel_aware; /* engage parallel-aware logic? */ bool parallel_safe; /* OK to use as part of parallel plan? */ + /* + * information needed for asynchronous execution + */ + bool async_capable; /* engage asynchronous execution logic? */ + /* * Common structural data for all Plan types. */ @@ -262,6 +267,10 @@ typedef struct Append /* Info for run-time subplan pruning; NULL if we're not doing that */ struct PartitionPruneInfo *part_prune_info; + + /* Async child node execution stuff */ + int nasyncplans; /* # async subplans, always at start of list */ + int referent; /* index of inheritance tree referent */ } Append; /* ---------------- diff --git a/src/include/pgstat.h b/src/include/pgstat.h index c55dc1481c..2259910637 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -887,7 +887,8 @@ typedef enum WAIT_EVENT_REPLICATION_SLOT_DROP, WAIT_EVENT_SAFE_SNAPSHOT, WAIT_EVENT_SYNC_REP, - WAIT_EVENT_XACT_GROUP_UPDATE + WAIT_EVENT_XACT_GROUP_UPDATE, + WAIT_EVENT_ASYNC_WAIT } WaitEventIPC; /* ---------- -- 2.18.2