From 60a9ba9e74666dba290f6bf27225384966d272a9 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi Date: Mon, 17 Oct 2016 15:54:32 +0900 Subject: [PATCH 3/7] Modify async execution infrastructure. --- contrib/postgres_fdw/expected/postgres_fdw.out | 68 ++++++++-------- contrib/postgres_fdw/postgres_fdw.c | 5 +- src/backend/executor/execAsync.c | 105 ++++++++++++++----------- src/backend/executor/nodeAppend.c | 50 ++++++------ src/backend/executor/nodeForeignscan.c | 4 +- src/backend/nodes/copyfuncs.c | 1 + src/backend/nodes/outfuncs.c | 1 + src/backend/nodes/readfuncs.c | 1 + src/backend/optimizer/plan/createplan.c | 24 +++++- src/backend/utils/adt/ruleutils.c | 6 +- src/include/executor/nodeForeignscan.h | 2 +- src/include/foreign/fdwapi.h | 2 +- src/include/nodes/execnodes.h | 10 ++- src/include/nodes/plannodes.h | 1 + 14 files changed, 167 insertions(+), 113 deletions(-) diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index d7420e0..fd8b628 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -6402,13 +6402,13 @@ insert into bar2 values(4,44,44); insert into bar2 values(7,77,77); explain (verbose, costs off) select * from bar where f1 in (select f1 from foo) for update; - QUERY PLAN ------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------------------------------- LockRows - Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid + Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid -> Hash Join - Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid - Hash Cond: (bar2.f1 = foo2.f1) + Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid + Hash Cond: (bar.f1 = foo.f1) -> Append -> Foreign Scan on public.bar2 Output: bar2.f1, bar2.f2, bar2.ctid, bar2.*, bar2.tableoid @@ -6416,10 +6416,10 @@ select * from bar where f1 in (select f1 from foo) for update; -> Seq Scan on public.bar Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid -> Hash - Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1 + Output: foo.ctid, foo.*, foo.tableoid, foo.f1 -> HashAggregate - Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1 - Group Key: foo2.f1 + Output: foo.ctid, foo.*, foo.tableoid, foo.f1 + Group Key: foo.f1 -> Append -> Foreign Scan on public.foo2 Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1 @@ -6439,13 +6439,13 @@ select * from bar where f1 in (select f1 from foo) for update; explain (verbose, costs off) select * from bar where f1 in (select f1 from foo) for share; - QUERY PLAN ------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------------------------------- LockRows - Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid + Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid -> Hash Join - Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid - Hash Cond: (bar2.f1 = foo2.f1) + Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid + Hash Cond: (bar.f1 = foo.f1) -> Append -> Foreign Scan on public.bar2 Output: bar2.f1, bar2.f2, bar2.ctid, bar2.*, bar2.tableoid @@ -6453,10 +6453,10 @@ select * from bar where f1 in (select f1 from foo) for share; -> Seq Scan on public.bar Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid -> Hash - Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1 + Output: foo.ctid, foo.*, foo.tableoid, foo.f1 -> HashAggregate - Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1 - Group Key: foo2.f1 + Output: foo.ctid, foo.*, foo.tableoid, foo.f1 + Group Key: foo.f1 -> Append -> Foreign Scan on public.foo2 Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1 @@ -6477,22 +6477,22 @@ select * from bar where f1 in (select f1 from foo) for share; -- Check UPDATE with inherited target and an inherited source table explain (verbose, costs off) update bar set f2 = f2 + 100 where f1 in (select f1 from foo); - QUERY PLAN ---------------------------------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------------------------- Update on public.bar Update on public.bar Foreign Update on public.bar2 Remote SQL: UPDATE public.loct2 SET f2 = $2 WHERE ctid = $1 -> Hash Join - Output: bar.f1, (bar.f2 + 100), bar.ctid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid - Hash Cond: (bar.f1 = foo2.f1) + Output: bar.f1, (bar.f2 + 100), bar.ctid, foo.ctid, foo.*, foo.tableoid + Hash Cond: (bar.f1 = foo.f1) -> Seq Scan on public.bar Output: bar.f1, bar.f2, bar.ctid -> Hash - Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1 + Output: foo.ctid, foo.*, foo.tableoid, foo.f1 -> HashAggregate - Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1 - Group Key: foo2.f1 + Output: foo.ctid, foo.*, foo.tableoid, foo.f1 + Group Key: foo.f1 -> Append -> Foreign Scan on public.foo2 Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1 @@ -6500,16 +6500,16 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo); -> Seq Scan on public.foo Output: foo.ctid, foo.*, foo.tableoid, foo.f1 -> Hash Join - Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid - Hash Cond: (bar2.f1 = foo2.f1) + Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, foo.ctid, foo.*, foo.tableoid + Hash Cond: (bar2.f1 = foo.f1) -> Foreign Scan on public.bar2 Output: bar2.f1, bar2.f2, bar2.f3, bar2.ctid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE -> Hash - Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1 + Output: foo.ctid, foo.*, foo.tableoid, foo.f1 -> HashAggregate - Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1 - Group Key: foo2.f1 + Output: foo.ctid, foo.*, foo.tableoid, foo.f1 + Group Key: foo.f1 -> Append -> Foreign Scan on public.foo2 Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1 @@ -6543,8 +6543,8 @@ where bar.f1 = ss.f1; Foreign Update on public.bar2 Remote SQL: UPDATE public.loct2 SET f2 = $2 WHERE ctid = $1 -> Hash Join - Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo2.f1)) - Hash Cond: (foo2.f1 = bar.f1) + Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1)) + Hash Cond: (foo.f1 = bar.f1) -> Append -> Foreign Scan on public.foo2 Output: ROW(foo2.f1), foo2.f1 @@ -6561,8 +6561,8 @@ where bar.f1 = ss.f1; -> Seq Scan on public.bar Output: bar.f1, bar.f2, bar.ctid -> Merge Join - Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, (ROW(foo2.f1)) - Merge Cond: (bar2.f1 = foo2.f1) + Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, (ROW(foo.f1)) + Merge Cond: (bar2.f1 = foo.f1) -> Sort Output: bar2.f1, bar2.f2, bar2.f3, bar2.ctid Sort Key: bar2.f1 @@ -6570,8 +6570,8 @@ where bar.f1 = ss.f1; Output: bar2.f1, bar2.f2, bar2.f3, bar2.ctid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE -> Sort - Output: (ROW(foo2.f1)), foo2.f1 - Sort Key: foo2.f1 + Output: (ROW(foo.f1)), foo.f1 + Sort Key: foo.f1 -> Append -> Foreign Scan on public.foo2 Output: ROW(foo2.f1), foo2.f1 diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index f180838..abb256b 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -354,7 +354,7 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root, static bool postgresIsForeignPathAsyncCapable(ForeignPath *path); static void postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq); -static void postgresForeignAsyncConfigureWait(EState *estate, +static bool postgresForeignAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq, bool reinit); static void postgresForeignAsyncNotify(EState *estate, @@ -4477,11 +4477,12 @@ postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq) ExecAsyncRequestDone(estate, areq, (Node *) slot); } -static void +static bool postgresForeignAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq, bool reinit) { elog(ERROR, "postgresForeignAsyncConfigureWait"); + return false; } static void diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c index e070c26..33496a9 100644 --- a/src/backend/executor/execAsync.c +++ b/src/backend/executor/execAsync.c @@ -22,7 +22,7 @@ #include "storage/latch.h" static bool ExecAsyncEventWait(EState *estate, long timeout); -static void ExecAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq, +static bool ExecAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq, bool reinit); static void ExecAsyncNotify(EState *estate, PendingAsyncRequest *areq); static void ExecAsyncResponse(EState *estate, PendingAsyncRequest *areq); @@ -43,7 +43,7 @@ ExecAsyncRequest(EState *estate, PlanState *requestor, int request_index, PlanState *requestee) { PendingAsyncRequest *areq = NULL; - int i = estate->es_num_pending_async; + int nasync = estate->es_num_pending_async; /* * If the number of pending asynchronous nodes exceeds the number of @@ -51,7 +51,7 @@ ExecAsyncRequest(EState *estate, PlanState *requestor, int request_index, * We start with 16 slots, and thereafter double the array size each * time we run out of slots. */ - if (i >= estate->es_max_pending_async) + if (nasync >= estate->es_max_pending_async) { int newmax; @@ -81,25 +81,28 @@ ExecAsyncRequest(EState *estate, PlanState *requestor, int request_index, * PendingAsyncRequest if there is one. If not, we must allocate a new * one. */ - if (estate->es_pending_async[i] == NULL) + if (estate->es_pending_async[nasync] == NULL) { areq = MemoryContextAllocZero(estate->es_query_cxt, sizeof(PendingAsyncRequest)); - estate->es_pending_async[i] = areq; + estate->es_pending_async[nasync] = areq; } else { - areq = estate->es_pending_async[i]; + areq = estate->es_pending_async[nasync]; MemSet(areq, 0, sizeof(PendingAsyncRequest)); } - areq->myindex = estate->es_num_pending_async++; + areq->myindex = estate->es_num_pending_async; /* Initialize the new request. */ areq->requestor = requestor; areq->request_index = request_index; areq->requestee = requestee; - /* Give the requestee a chance to do whatever it wants. */ + /* + * Give the requestee a chance to do whatever it wants. + * Requst functions return true if a result is immediately available. + */ switch (nodeTag(requestee)) { case T_ForeignScanState: @@ -110,6 +113,20 @@ ExecAsyncRequest(EState *estate, PlanState *requestor, int request_index, elog(ERROR, "unrecognized node type: %d", (int) nodeTag(requestee)); } + + /* + * If a result is available, complete it immediately. + */ + if (areq->state == ASYNC_COMPLETE) + { + Assert(areq->result == NULL || IsA(areq->result, TupleTableSlot)); + ExecAsyncResponse(estate, areq); + + return; + } + + /* No result available now, make this node pending */ + estate->es_num_pending_async++; } /* @@ -175,22 +192,19 @@ ExecAsyncEventLoop(EState *estate, PlanState *requestor, long timeout) { PendingAsyncRequest *areq = estate->es_pending_async[i]; - /* Skip it if no callback is pending. */ - if (!areq->callback_pending) - continue; - - /* - * Mark it as no longer needing a callback. We must do this - * before dispatching the callback in case the callback resets - * the flag. - */ - areq->callback_pending = false; - estate->es_async_callback_pending--; - - /* Perform the actual callback; set request_done if appropraite. */ - if (!areq->request_complete) + /* Skip it if not pending. */ + if (areq->state == ASYNC_CALLBACK_PENDING) + { + /* + * Mark it as no longer needing a callback. We must do this + * before dispatching the callback in case the callback resets + * the flag. + */ + estate->es_async_callback_pending--; ExecAsyncNotify(estate, areq); - else + } + + if (areq->state == ASYNC_COMPLETE) { any_node_done = true; if (requestor == areq->requestor) @@ -214,7 +228,7 @@ ExecAsyncEventLoop(EState *estate, PlanState *requestor, long timeout) PendingAsyncRequest *head; PendingAsyncRequest *tail = estate->es_pending_async[tidx]; - if (!tail->callback_pending && tail->request_complete) + if (tail->state == ASYNC_COMPLETE) continue; head = estate->es_pending_async[hidx]; estate->es_pending_async[tidx] = head; @@ -247,7 +261,8 @@ ExecAsyncEventLoop(EState *estate, PlanState *requestor, long timeout) * means wait forever, 0 means don't wait at all, and >0 means wait for the * indicated number of milliseconds. * - * Returns true if we found some events and false if we timed out. + * Returns true if we found some events and false if we timed out or there's + * no event to wait. The latter is occur when the areq is processed during */ static bool ExecAsyncEventWait(EState *estate, long timeout) @@ -258,6 +273,7 @@ ExecAsyncEventWait(EState *estate, long timeout) int n; bool reinit = false; bool process_latch_set = false; + bool added = false; if (estate->es_wait_event_set == NULL) { @@ -282,13 +298,16 @@ ExecAsyncEventWait(EState *estate, long timeout) PendingAsyncRequest *areq = estate->es_pending_async[i]; if (areq->num_fd_events > 0) - ExecAsyncConfigureWait(estate, areq, reinit); + added |= ExecAsyncConfigureWait(estate, areq, reinit); } + Assert(added); + /* Wait for at least one event to occur. */ noccurred = WaitEventSetWait(estate->es_wait_event_set, timeout, occurred_event, EVENT_BUFFER_SIZE, WAIT_EVENT_ASYNC_WAIT); + if (noccurred == 0) return false; @@ -312,12 +331,10 @@ ExecAsyncEventWait(EState *estate, long timeout) { PendingAsyncRequest *areq = w->user_data; - if (!areq->callback_pending) - { - Assert(!areq->request_complete); - areq->callback_pending = true; - estate->es_async_callback_pending++; - } + Assert(areq->state == ASYNC_WAITING); + + areq->state = ASYNC_CALLBACK_PENDING; + estate->es_async_callback_pending++; } } @@ -333,8 +350,8 @@ ExecAsyncEventWait(EState *estate, long timeout) if (areq->wants_process_latch) { - Assert(!areq->request_complete); - areq->callback_pending = true; + Assert(areq->state == ASYNC_WAITING); + areq->state = ASYNC_CALLBACK_PENDING; } } } @@ -352,15 +369,19 @@ ExecAsyncEventWait(EState *estate, long timeout) * The events should include only WL_SOCKET_READABLE or WL_SOCKET_WRITEABLE, * and the number of calls should not exceed areq->num_fd_events (as * prevously set via ExecAsyncSetRequiredEvents). + * + * Individual requests can omit registering an event but it is a + * responsibility of the node driver to set at least one event per one + * requestor. */ -static void +static bool ExecAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq, bool reinit) { switch (nodeTag(areq->requestee)) { case T_ForeignScanState: - ExecAsyncForeignScanConfigureWait(estate, areq, reinit); + return ExecAsyncForeignScanConfigureWait(estate, areq, reinit); break; default: elog(ERROR, "unrecognized node type: %d", @@ -419,6 +440,7 @@ ExecAsyncSetRequiredEvents(EState *estate, PendingAsyncRequest *areq, estate->es_total_fd_events += num_fd_events - areq->num_fd_events; areq->num_fd_events = num_fd_events; areq->wants_process_latch = wants_process_latch; + areq->state = ASYNC_WAITING; if (force_reset && estate->es_wait_event_set != NULL) { @@ -448,17 +470,12 @@ ExecAsyncRequestDone(EState *estate, PendingAsyncRequest *areq, Node *result) * need a callback to remove registered wait events. It's not clear * that we would come out ahead, so use brute force for now. */ + Assert(areq->state == ASYNC_IDLE || areq->state == ASYNC_CALLBACK_PENDING); + if (areq->num_fd_events > 0 || areq->wants_process_latch) ExecAsyncSetRequiredEvents(estate, areq, 0, false, true); /* Save result and mark request as complete. */ areq->result = result; - areq->request_complete = true; - - /* Make sure this request is flagged for a callback. */ - if (!areq->callback_pending) - { - areq->callback_pending = true; - estate->es_async_callback_pending++; - } + areq->state = ASYNC_COMPLETE; } diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index e61218a..568fa25 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -229,9 +229,15 @@ ExecAppend(AppendState *node) */ while ((i = bms_first_member(node->as_needrequest)) >= 0) { - ExecAsyncRequest(estate, &node->ps, i, node->appendplans[i]); node->as_nasyncpending++; + + ExecAsyncRequest(estate, &node->ps, i, node->appendplans[i]); + /* If this request immediately gives a result, take it. */ + if (node->as_nasyncresult > 0) + return node->as_asyncresult[--node->as_nasyncresult]; } + if (node->as_nasyncpending == 0 && node->as_syncdone) + return ExecClearTuple(node->ps.ps_ResultTupleSlot); } for (;;) @@ -246,32 +252,32 @@ ExecAppend(AppendState *node) { long timeout = node->as_syncdone ? -1 : 0; - for (;;) + while (node->as_nasyncpending > 0) { - if (node->as_nasyncpending == 0) - { - /* - * If there is no asynchronous activity still pending - * and the synchronous activity is also complete, we're - * totally done scanning this node. Otherwise, we're - * done with the asynchronous stuff but must continue - * scanning the synchronous children. - */ - if (node->as_syncdone) - return ExecClearTuple(node->ps.ps_ResultTupleSlot); - break; - } - if (!ExecAsyncEventLoop(node->ps.state, &node->ps, timeout)) - { - /* Timeout reached. */ - break; - } - if (node->as_nasyncresult > 0) + if (ExecAsyncEventLoop(node->ps.state, &node->ps, timeout) && + node->as_nasyncresult > 0) { /* Asynchronous subplan returned a tuple! */ --node->as_nasyncresult; return node->as_asyncresult[node->as_nasyncresult]; } + + /* Timeout reached. Go through to sync nodes if exists */ + if (!node->as_syncdone) + break; + } + + /* + * If there is no asynchronous activity still pending and the + * synchronous activity is also complete, we're totally done + * scanning this node. Otherwise, we're done with the + * asynchronous stuff but must continue scanning the synchronous + * children. + */ + if (node->as_syncdone) + { + Assert(node->as_nasyncpending == 0); + return ExecClearTuple(node->ps.ps_ResultTupleSlot); } } @@ -397,7 +403,7 @@ ExecAsyncAppendResponse(EState *estate, PendingAsyncRequest *areq) TupleTableSlot *slot; /* We shouldn't be called until the request is complete. */ - Assert(areq->request_complete); + Assert(areq->state == ASYNC_COMPLETE); /* Our result slot shouldn't already be occupied. */ Assert(TupIsNull(node->ps.ps_ResultTupleSlot)); diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 61899d1..85dad79 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -376,7 +376,7 @@ ExecAsyncForeignScanRequest(EState *estate, PendingAsyncRequest *areq) * In async mode, configure for a wait * ---------------------------------------------------------------- */ -void +bool ExecAsyncForeignScanConfigureWait(EState *estate, PendingAsyncRequest *areq, bool reinit) { @@ -384,7 +384,7 @@ ExecAsyncForeignScanConfigureWait(EState *estate, FdwRoutine *fdwroutine = node->fdwroutine; Assert(fdwroutine->ForeignAsyncConfigureWait != NULL); - fdwroutine->ForeignAsyncConfigureWait(estate, areq, reinit); + return fdwroutine->ForeignAsyncConfigureWait(estate, areq, reinit); } /* ---------------------------------------------------------------- diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index a8cabdf..c62aaf2 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -237,6 +237,7 @@ _copyAppend(const Append *from) */ COPY_NODE_FIELD(appendplans); COPY_SCALAR_FIELD(nasyncplans); + COPY_SCALAR_FIELD(referent); return newnode; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index a894a9d..c2e34a8 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -370,6 +370,7 @@ _outAppend(StringInfo str, const Append *node) WRITE_NODE_FIELD(appendplans); WRITE_INT_FIELD(nasyncplans); + WRITE_INT_FIELD(referent); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 67439ec..9837eff 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1540,6 +1540,7 @@ _readAppend(void) READ_NODE_FIELD(appendplans); READ_INT_FIELD(nasyncplans); + READ_INT_FIELD(referent); READ_DONE(); } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 968f8be..a9164ab 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -194,7 +194,8 @@ static CteScan *make_ctescan(List *qptlist, List *qpqual, Index scanrelid, int ctePlanId, int cteParam); static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual, Index scanrelid, int wtParam); -static Append *make_append(List *asyncplans, int nasyncplans, List *tlist); +static Append *make_append(List *asyncplans, int nasyncplans, + int referent, List *tlist); static RecursiveUnion *make_recursive_union(List *tlist, Plan *lefttree, Plan *righttree, @@ -966,6 +967,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path) List *syncplans = NIL; ListCell *subpaths; int nasyncplans = 0; + bool first = true; + bool referent_is_sync = true; /* * The subpaths list could be empty, if every child was proven empty by @@ -991,7 +994,14 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path) return plan; } - /* Build the plan for each child */ + /* + * Build the plan for each child + + * The first child in an inheritance set is the representative in + * explaining tlist entries (see set_deparse_planstate). We should keep + * the first child in best_path->subpaths at the head of the subplan list + * for the reason. + */ foreach(subpaths, best_path->subpaths) { Path *subpath = (Path *) lfirst(subpaths); @@ -1005,9 +1015,13 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path) { asyncplans = lappend(asyncplans, subplan); ++nasyncplans; + if (first) + referent_is_sync = false; } else syncplans = lappend(syncplans, subplan); + + first = false; } /* @@ -1017,7 +1031,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path) * parent-rel Vars it'll be asked to emit. */ - plan = make_append(list_concat(asyncplans, syncplans), nasyncplans, tlist); + plan = make_append(list_concat(asyncplans, syncplans), nasyncplans, + referent_is_sync ? nasyncplans : 0, tlist); copy_generic_path_info(&plan->plan, (Path *) best_path); @@ -5019,7 +5034,7 @@ make_foreignscan(List *qptlist, } static Append * -make_append(List *appendplans, int nasyncplans, List *tlist) +make_append(List *appendplans, int nasyncplans, int referent, List *tlist) { Append *node = makeNode(Append); Plan *plan = &node->plan; @@ -5030,6 +5045,7 @@ make_append(List *appendplans, int nasyncplans, List *tlist) plan->righttree = NULL; node->appendplans = appendplans; node->nasyncplans = nasyncplans; + node->referent = referent; return node; } diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index f26175e..37fc817 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -4242,7 +4242,11 @@ set_deparse_planstate(deparse_namespace *dpns, PlanState *ps) * lists containing references to non-target relations. */ if (IsA(ps, AppendState)) - dpns->outer_planstate = ((AppendState *) ps)->appendplans[0]; + { + int idx = ((Append*)(((AppendState *) ps)->ps.plan))->referent; + dpns->outer_planstate = + ((AppendState *) ps)->appendplans[idx]; + } else if (IsA(ps, MergeAppendState)) dpns->outer_planstate = ((MergeAppendState *) ps)->mergeplans[0]; else if (IsA(ps, ModifyTableState)) diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index 5a61306..2d9a62b 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -31,7 +31,7 @@ extern void ExecForeignScanInitializeWorker(ForeignScanState *node, extern void ExecAsyncForeignScanRequest(EState *estate, PendingAsyncRequest *areq); -extern void ExecAsyncForeignScanConfigureWait(EState *estate, +extern bool ExecAsyncForeignScanConfigureWait(EState *estate, PendingAsyncRequest *areq, bool reinit); extern void ExecAsyncForeignScanNotify(EState *estate, PendingAsyncRequest *areq); diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 4c50f1e..41fc76f 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -158,7 +158,7 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root, typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path); typedef void (*ForeignAsyncRequest_function) (EState *estate, PendingAsyncRequest *areq); -typedef void (*ForeignAsyncConfigureWait_function) (EState *estate, +typedef bool (*ForeignAsyncConfigureWait_function) (EState *estate, PendingAsyncRequest *areq, bool reinit); typedef void (*ForeignAsyncNotify_function) (EState *estate, diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 29f3d7c..9b43fd6 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -357,6 +357,13 @@ typedef struct ResultRelInfo * State for an asynchronous tuple request. * ---------------- */ +typedef enum AsyncRequestState +{ + ASYNC_IDLE, + ASYNC_WAITING, + ASYNC_CALLBACK_PENDING, + ASYNC_COMPLETE +} AsyncRequestState; typedef struct PendingAsyncRequest { int myindex; /* Index in es_pending_async. */ @@ -365,8 +372,7 @@ typedef struct PendingAsyncRequest int request_index; /* Scratch space for requestor. */ int num_fd_events; /* Max number of FD events requestee needs. */ bool wants_process_latch; /* Requestee cares about MyLatch. */ - bool callback_pending; /* Callback is needed. */ - bool request_complete; /* Request complete, result valid. */ + AsyncRequestState state; Node *result; /* Result (NULL if no more tuples). */ } PendingAsyncRequest; diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index f0daada..ebbc78d 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -229,6 +229,7 @@ typedef struct Append Plan plan; List *appendplans; int nasyncplans; /* # of async plans, always at start of list */ + int referent; /* index of inheritance tree referent */ } Append; /* ---------------- -- 2.9.2