From 0b279ad32ea441580ead8056c855119c3d871aca Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi Date: Thu, 23 Feb 2017 15:04:46 +0900 Subject: [PATCH 3/4] Make postgres_fdw async-capable. Make postgre_fdw async-capable using the infrastructure. Additionaly, this makes connections for postgres_fdw have a connection-specific area to store information so that foreign scans on the same connection can share some data. postgres_fdw shares scan node currently running on the underlying connection. This allows us async-execution of multiple foreign scans on one foreign server. --- contrib/postgres_fdw/connection.c | 79 ++-- contrib/postgres_fdw/expected/postgres_fdw.out | 144 ++++--- contrib/postgres_fdw/postgres_fdw.c | 522 +++++++++++++++++++++---- contrib/postgres_fdw/postgres_fdw.h | 2 + contrib/postgres_fdw/sql/postgres_fdw.sql | 12 +- 5 files changed, 595 insertions(+), 164 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 8c33dea..0b1af3b 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -53,6 +53,7 @@ typedef struct ConnCacheEntry bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state change in process */ + void *storage; /* connection specific storage */ } ConnCacheEntry; /* @@ -68,6 +69,7 @@ static unsigned int prep_stmt_number = 0; static bool xact_got_connection = false; /* prototypes of private functions */ +static ConnCacheEntry *get_connection_entry(Oid umid); static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static void check_conn_params(const char **keywords, const char **values); static void configure_remote_session(PGconn *conn); @@ -85,26 +87,12 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result); - /* - * Get a PGconn which can be used to execute queries on the remote PostgreSQL - * server with the user's authorization. A new connection is established - * if we don't already have a suitable one, and a transaction is opened at - * the right subtransaction nesting depth if we didn't do that already. - * - * will_prep_stmt must be true if caller intends to create any prepared - * statements. Since those don't go away automatically at transaction end - * (not even on error), we need this flag to cue manual cleanup. - * - * XXX Note that caching connections theoretically requires a mechanism to - * detect change of FDW objects to invalidate already established connections. - * We could manage that by watching for invalidation events on the relevant - * syscaches. For the moment, though, it's not clear that this would really - * be useful and not mere pedantry. We could not flush any active connections - * mid-transaction anyway. + * Common function to acquire or create a connection cache entry. */ -PGconn * -GetConnection(UserMapping *user, bool will_prep_stmt) + +static ConnCacheEntry * +get_connection_entry(Oid umid) { bool found; ConnCacheEntry *entry; @@ -132,11 +120,8 @@ GetConnection(UserMapping *user, bool will_prep_stmt) RegisterSubXactCallback(pgfdw_subxact_callback, NULL); } - /* Set flag that we did GetConnection during the current transaction */ - xact_got_connection = true; - /* Create hash key for the entry. Assume no pad bytes in key struct */ - key = user->umid; + key = umid; /* * Find or create cached entry for requested connection. @@ -150,11 +135,42 @@ GetConnection(UserMapping *user, bool will_prep_stmt) entry->have_prep_stmt = false; entry->have_error = false; entry->changing_xact_state = false; + entry->storage = NULL; } /* Reject further use of connections which failed abort cleanup. */ pgfdw_reject_incomplete_xact_state_change(entry); + return entry; +} + +/* + * Get a PGconn which can be used to execute queries on the remote PostgreSQL + * server with the user's authorization. A new connection is established + * if we don't already have a suitable one, and a transaction is opened at + * the right subtransaction nesting depth if we didn't do that already. + * + * will_prep_stmt must be true if caller intends to create any prepared + * statements. Since those don't go away automatically at transaction end + * (not even on error), we need this flag to cue manual cleanup. + * + * XXX Note that caching connections theoretically requires a mechanism to + * detect change of FDW objects to invalidate already established connections. + * We could manage that by watching for invalidation events on the relevant + * syscaches. For the moment, though, it's not clear that this would really + * be useful and not mere pedantry. We could not flush any active connections + * mid-transaction anyway. + */ +PGconn * +GetConnection(UserMapping *user, bool will_prep_stmt) +{ + ConnCacheEntry *entry; + + /* Set flag that we did GetConnection during the current transaction */ + xact_got_connection = true; + + entry = get_connection_entry(user->umid); + /* * We don't check the health of cached connection here, because it would * require some overhead. Broken connection will be detected when the @@ -191,6 +207,25 @@ GetConnection(UserMapping *user, bool will_prep_stmt) } /* + * Rerturns the connection specific storage for this user. Allocate with + * initsize if not exists. + */ +void * +GetConnectionSpecificStorage(UserMapping *user, size_t initsize) +{ + ConnCacheEntry *entry; + + entry = get_connection_entry(user->umid); + if (entry->storage == NULL) + { + entry->storage = MemoryContextAlloc(CacheMemoryContext, initsize); + memset(entry->storage, 0, initsize); + } + + return entry->storage; +} + +/* * Connect to remote server using specified server and user mapping properties. */ static PGconn * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index b112c19..7401304 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -6417,12 +6417,12 @@ INSERT INTO b(aa) VALUES('bbbbb'); SELECT tableoid::regclass, * FROM a; tableoid | aa ----------+------- - a | aaa - a | aaaa - a | aaaaa b | bbb b | bbbb b | bbbbb + a | aaa + a | aaaa + a | aaaaa (6 rows) SELECT tableoid::regclass, * FROM b; @@ -6445,12 +6445,12 @@ UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%'; SELECT tableoid::regclass, * FROM a; tableoid | aa ----------+-------- - a | aaa - a | zzzzzz - a | zzzzzz b | bbb b | bbbb b | bbbbb + a | aaa + a | zzzzzz + a | zzzzzz (6 rows) SELECT tableoid::regclass, * FROM b; @@ -6473,12 +6473,12 @@ UPDATE b SET aa = 'new'; SELECT tableoid::regclass, * FROM a; tableoid | aa ----------+-------- - a | aaa - a | zzzzzz - a | zzzzzz b | new b | new b | new + a | aaa + a | zzzzzz + a | zzzzzz (6 rows) SELECT tableoid::regclass, * FROM b; @@ -6501,12 +6501,12 @@ UPDATE a SET aa = 'newtoo'; SELECT tableoid::regclass, * FROM a; tableoid | aa ----------+-------- - a | newtoo - a | newtoo - a | newtoo b | newtoo b | newtoo b | newtoo + a | newtoo + a | newtoo + a | newtoo (6 rows) SELECT tableoid::regclass, * FROM b; @@ -6564,35 +6564,40 @@ insert into bar2 values(3,33,33); insert into bar2 values(4,44,44); insert into bar2 values(7,77,77); explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for update; - QUERY PLAN ----------------------------------------------------------------------------------------------- +select * from bar where f1 in (select f1 from foo) order by 1 for update; + QUERY PLAN +----------------------------------------------------------------------------------------------------------------- LockRows Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid - -> Hash Join + -> Merge Join Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid Inner Unique: true - Hash Cond: (bar.f1 = foo.f1) - -> Append - -> Seq Scan on public.bar + Merge Cond: (bar.f1 = foo.f1) + -> Merge Append + Sort Key: bar.f1 + -> Sort Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid + Sort Key: bar.f1 + -> Seq Scan on public.bar + Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid -> Foreign Scan on public.bar2 Output: bar2.f1, bar2.f2, bar2.ctid, bar2.*, bar2.tableoid - Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE - -> Hash + Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE + -> Sort Output: foo.ctid, foo.*, foo.tableoid, foo.f1 + Sort Key: foo.f1 -> HashAggregate Output: foo.ctid, foo.*, foo.tableoid, foo.f1 Group Key: foo.f1 -> Append - -> Seq Scan on public.foo - Output: foo.ctid, foo.*, foo.tableoid, foo.f1 -> Foreign Scan on public.foo2 Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1 Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(23 rows) + -> Seq Scan on public.foo + Output: foo.ctid, foo.*, foo.tableoid, foo.f1 +(28 rows) -select * from bar where f1 in (select f1 from foo) for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; f1 | f2 ----+---- 1 | 11 @@ -6602,35 +6607,40 @@ select * from bar where f1 in (select f1 from foo) for update; (4 rows) explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for share; - QUERY PLAN ----------------------------------------------------------------------------------------------- +select * from bar where f1 in (select f1 from foo) order by 1 for share; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------- LockRows Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid - -> Hash Join + -> Merge Join Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid Inner Unique: true - Hash Cond: (bar.f1 = foo.f1) - -> Append - -> Seq Scan on public.bar + Merge Cond: (bar.f1 = foo.f1) + -> Merge Append + Sort Key: bar.f1 + -> Sort Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid + Sort Key: bar.f1 + -> Seq Scan on public.bar + Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid -> Foreign Scan on public.bar2 Output: bar2.f1, bar2.f2, bar2.ctid, bar2.*, bar2.tableoid - Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE - -> Hash + Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE + -> Sort Output: foo.ctid, foo.*, foo.tableoid, foo.f1 + Sort Key: foo.f1 -> HashAggregate Output: foo.ctid, foo.*, foo.tableoid, foo.f1 Group Key: foo.f1 -> Append - -> Seq Scan on public.foo - Output: foo.ctid, foo.*, foo.tableoid, foo.f1 -> Foreign Scan on public.foo2 Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1 Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(23 rows) + -> Seq Scan on public.foo + Output: foo.ctid, foo.*, foo.tableoid, foo.f1 +(28 rows) -select * from bar where f1 in (select f1 from foo) for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; f1 | f2 ----+---- 1 | 11 @@ -6660,11 +6670,11 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo); Output: foo.ctid, foo.*, foo.tableoid, foo.f1 Group Key: foo.f1 -> Append - -> Seq Scan on public.foo - Output: foo.ctid, foo.*, foo.tableoid, foo.f1 -> Foreign Scan on public.foo2 Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1 Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 + -> Seq Scan on public.foo + Output: foo.ctid, foo.*, foo.tableoid, foo.f1 -> Hash Join Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, foo.ctid, foo.*, foo.tableoid Inner Unique: true @@ -6678,11 +6688,11 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo); Output: foo.ctid, foo.*, foo.tableoid, foo.f1 Group Key: foo.f1 -> Append - -> Seq Scan on public.foo - Output: foo.ctid, foo.*, foo.tableoid, foo.f1 -> Foreign Scan on public.foo2 Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1 Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 + -> Seq Scan on public.foo + Output: foo.ctid, foo.*, foo.tableoid, foo.f1 (39 rows) update bar set f2 = f2 + 100 where f1 in (select f1 from foo); @@ -6713,16 +6723,16 @@ where bar.f1 = ss.f1; Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1)) Hash Cond: (foo.f1 = bar.f1) -> Append - -> Seq Scan on public.foo - Output: ROW(foo.f1), foo.f1 -> Foreign Scan on public.foo2 Output: ROW(foo2.f1), foo2.f1 Remote SQL: SELECT f1 FROM public.loct1 - -> Seq Scan on public.foo foo_1 - Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3) -> Foreign Scan on public.foo2 foo2_1 Output: ROW((foo2_1.f1 + 3)), (foo2_1.f1 + 3) Remote SQL: SELECT f1 FROM public.loct1 + -> Seq Scan on public.foo + Output: ROW(foo.f1), foo.f1 + -> Seq Scan on public.foo foo_1 + Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3) -> Hash Output: bar.f1, bar.f2, bar.ctid -> Seq Scan on public.bar @@ -6740,16 +6750,16 @@ where bar.f1 = ss.f1; Output: (ROW(foo.f1)), foo.f1 Sort Key: foo.f1 -> Append - -> Seq Scan on public.foo - Output: ROW(foo.f1), foo.f1 -> Foreign Scan on public.foo2 Output: ROW(foo2.f1), foo2.f1 Remote SQL: SELECT f1 FROM public.loct1 - -> Seq Scan on public.foo foo_1 - Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3) -> Foreign Scan on public.foo2 foo2_1 Output: ROW((foo2_1.f1 + 3)), (foo2_1.f1 + 3) Remote SQL: SELECT f1 FROM public.loct1 + -> Seq Scan on public.foo + Output: ROW(foo.f1), foo.f1 + -> Seq Scan on public.foo foo_1 + Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3) (45 rows) update bar set f2 = f2 + 100 @@ -6900,27 +6910,33 @@ delete from foo where f1 < 5 returning *; (5 rows) explain (verbose, costs off) -update bar set f2 = f2 + 100 returning *; - QUERY PLAN ------------------------------------------------------------------------------- - Update on public.bar - Output: bar.f1, bar.f2 - Update on public.bar - Foreign Update on public.bar2 - -> Seq Scan on public.bar - Output: bar.f1, (bar.f2 + 100), bar.ctid - -> Foreign Update on public.bar2 - Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2 -(8 rows) +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; + QUERY PLAN +-------------------------------------------------------------------------------------- + Sort + Output: u.f1, u.f2 + Sort Key: u.f1 + CTE u + -> Update on public.bar + Output: bar.f1, bar.f2 + Update on public.bar + Foreign Update on public.bar2 + -> Seq Scan on public.bar + Output: bar.f1, (bar.f2 + 100), bar.ctid + -> Foreign Update on public.bar2 + Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2 + -> CTE Scan on u + Output: u.f1, u.f2 +(14 rows) -update bar set f2 = f2 + 100 returning *; +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; f1 | f2 ----+----- 1 | 311 2 | 322 - 6 | 266 3 | 333 4 | 344 + 6 | 266 7 | 277 (6 rows) diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 7214666..b09a099 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -20,6 +20,8 @@ #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" +#include "executor/execAsync.h" +#include "executor/nodeForeignscan.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" @@ -34,6 +36,7 @@ #include "optimizer/var.h" #include "optimizer/tlist.h" #include "parser/parsetree.h" +#include "pgstat.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/lsyscache.h" @@ -53,6 +56,9 @@ PG_MODULE_MAGIC; /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +/* Retrive PgFdwScanState struct from ForeginScanState */ +#define GetPgFdwScanState(n) ((PgFdwScanState *)(n)->fdw_state) + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -120,10 +126,27 @@ enum FdwDirectModifyPrivateIndex }; /* + * Connection private area structure. + */ +typedef struct PgFdwConnpriv +{ + ForeignScanState *current_owner; /* The node currently running a query + * on this connection*/ +} PgFdwConnpriv; + +/* Execution state base type */ +typedef struct PgFdwState +{ + PGconn *conn; /* connection for the scan */ + PgFdwConnpriv *connpriv; /* connection private memory */ +} PgFdwState; + +/* * Execution state of a foreign scan using postgres_fdw. */ typedef struct PgFdwScanState { + PgFdwState s; /* common structure */ Relation rel; /* relcache entry for the foreign table. NULL * for a foreign join scan. */ TupleDesc tupdesc; /* tuple descriptor of scan */ @@ -134,7 +157,7 @@ typedef struct PgFdwScanState List *retrieved_attrs; /* list of retrieved attribute numbers */ /* for remote query execution */ - PGconn *conn; /* connection for the scan */ + bool result_ready; unsigned int cursor_number; /* quasi-unique ID for my cursor */ bool cursor_exists; /* have we created the cursor? */ int numParams; /* number of parameters passed to query */ @@ -150,6 +173,13 @@ typedef struct PgFdwScanState /* batch-level state, for optimizing rewinds and avoiding useless fetch */ int fetch_ct_2; /* Min(# of fetches done, 2) */ bool eof_reached; /* true if last fetch reached EOF */ + bool run_async; /* true if run asynchronously */ + bool async_waiting; /* true if requesting the parent to wait */ + ForeignScanState *waiter; /* Next node to run a query among nodes + * sharing the same connection */ + ForeignScanState *last_waiter; /* A waiting node at the end of a waiting + * list. Maintained only by the current + * owner of the connection */ /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ @@ -163,11 +193,11 @@ typedef struct PgFdwScanState */ typedef struct PgFdwModifyState { + PgFdwState s; /* common structure */ Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* for remote query execution */ - PGconn *conn; /* connection for the scan */ char *p_name; /* name of prepared statement, if created */ /* extracted fdw_private data */ @@ -190,6 +220,7 @@ typedef struct PgFdwModifyState */ typedef struct PgFdwDirectModifyState { + PgFdwState s; /* common structure */ Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ @@ -288,6 +319,7 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags); static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node); static void postgresReScanForeignScan(ForeignScanState *node); static void postgresEndForeignScan(ForeignScanState *node); +static void postgresShutdownForeignScan(ForeignScanState *node); static void postgresAddForeignUpdateTargets(Query *parsetree, RangeTblEntry *target_rte, Relation target_relation); @@ -348,6 +380,14 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, RelOptInfo *input_rel, RelOptInfo *output_rel); +static bool postgresIsForeignPathAsyncCapable(ForeignPath *path); +static void postgresForeignAsyncRequest(EState *estate, + PendingAsyncRequest *areq); +static bool postgresForeignAsyncConfigureWait(EState *estate, + PendingAsyncRequest *areq, + bool reinit); +static void postgresForeignAsyncNotify(EState *estate, + PendingAsyncRequest *areq); /* * Helper functions @@ -368,7 +408,10 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg); static void create_cursor(ForeignScanState *node); -static void fetch_more_data(ForeignScanState *node); +static void request_more_data(ForeignScanState *node); +static void fetch_received_data(ForeignScanState *node); +static void vacate_connection(PgFdwState *fdwconn); +static void absorb_current_result(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, @@ -438,6 +481,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->IterateForeignScan = postgresIterateForeignScan; routine->ReScanForeignScan = postgresReScanForeignScan; routine->EndForeignScan = postgresEndForeignScan; + routine->ShutdownForeignScan = postgresShutdownForeignScan; /* Functions for updating foreign tables */ routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets; @@ -472,6 +516,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + /* Support functions for async execution */ + routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable; + routine->ForeignAsyncRequest = postgresForeignAsyncRequest; + routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait; + routine->ForeignAsyncNotify = postgresForeignAsyncNotify; + PG_RETURN_POINTER(routine); } @@ -1322,12 +1372,21 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->s.conn = GetConnection(user, false); + fsstate->s.connpriv = (PgFdwConnpriv *) + GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv)); + fsstate->s.connpriv->current_owner = NULL; + fsstate->waiter = NULL; + fsstate->last_waiter = node; /* Assign a unique ID for my cursor */ - fsstate->cursor_number = GetCursorNumber(fsstate->conn); + fsstate->cursor_number = GetCursorNumber(fsstate->s.conn); fsstate->cursor_exists = false; + /* Initialize async execution status */ + fsstate->run_async = false; + fsstate->async_waiting = false; + /* Get private info created by planner functions. */ fsstate->query = strVal(list_nth(fsplan->fdw_private, FdwScanPrivateSelectSql)); @@ -1383,32 +1442,130 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) static TupleTableSlot * postgresIterateForeignScan(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; /* - * If this is the first call after Begin or ReScan, we need to create the - * cursor on the remote side. - */ - if (!fsstate->cursor_exists) - create_cursor(node); - - /* * Get some more tuples, if we've run out. */ if (fsstate->next_tuple >= fsstate->num_tuples) { - /* No point in another fetch if we already detected EOF, though. */ - if (!fsstate->eof_reached) - fetch_more_data(node); - /* If we didn't get any tuples, must be end of data. */ + ForeignScanState *next_conn_owner = node; + + /* This node has sent a query on this connection */ + if (fsstate->s.connpriv->current_owner == node) + { + /* Check if the result is available */ + if (PQisBusy(fsstate->s.conn)) + { + int rc = WaitLatchOrSocket(NULL, + WL_SOCKET_READABLE | WL_TIMEOUT, + PQsocket(fsstate->s.conn), 0, + WAIT_EVENT_ASYNC_WAIT); + if (fsstate->run_async && !(rc & WL_SOCKET_READABLE)) + { + /* + * This node is not ready yet. Tell the caller to wait. + */ + fsstate->result_ready = false; + return ExecClearTuple(slot); + } + } + + Assert(fsstate->async_waiting); + fsstate->async_waiting = false; + fetch_received_data(node); + + /* + * If someone is waiting this node on the same connection, let the + * first waiter be the next owner of this connection. + */ + if (fsstate->waiter) + { + PgFdwScanState *next_owner_state; + + next_conn_owner = fsstate->waiter; + next_owner_state = GetPgFdwScanState(next_conn_owner); + fsstate->waiter = NULL; + + /* + * only the current owner is responsible to maintain the shortcut + * to the last waiter + */ + next_owner_state->last_waiter = fsstate->last_waiter; + + /* + * for simplicity, last_waiter points itself on a node that no one + * is waiting for. + */ + fsstate->last_waiter = node; + } + } + else if (fsstate->s.connpriv->current_owner && + !GetPgFdwScanState(node)->eof_reached) + { + /* + * Anyone else is holding this connection and we want this node to + * run later. Add myself to the tail of the waiters' list then + * return not-ready. To avoid scanning through the waiters' list, + * the current owner is to maintain the shortcut to the last + * waiter. + */ + PgFdwScanState *conn_owner_state = + GetPgFdwScanState(fsstate->s.connpriv->current_owner); + ForeignScanState *last_waiter = conn_owner_state->last_waiter; + PgFdwScanState *last_waiter_state = GetPgFdwScanState(last_waiter); + + last_waiter_state->waiter = node; + conn_owner_state->last_waiter = node; + + /* Register the node to the async-waiting node list */ + Assert(!GetPgFdwScanState(node)->async_waiting); + + GetPgFdwScanState(node)->async_waiting = true; + + fsstate->result_ready = fsstate->eof_reached; + return ExecClearTuple(slot); + } + + /* At this time no node is running on the connection */ + Assert(GetPgFdwScanState(next_conn_owner)->s.connpriv->current_owner + == NULL); + /* + * Send the next request for the next owner of this connection if + * needed. + */ + if (!GetPgFdwScanState(next_conn_owner)->eof_reached) + { + PgFdwScanState *next_owner_state = + GetPgFdwScanState(next_conn_owner); + + request_more_data(next_conn_owner); + + /* Register the node to the async-waiting node list */ + if (!next_owner_state->async_waiting) + next_owner_state->async_waiting = true; + + if (!next_owner_state->run_async) + fetch_received_data(next_conn_owner); + } + + + /* + * If we haven't received a result for the given node this time, + * return with no tuple to give way to other nodes. + */ if (fsstate->next_tuple >= fsstate->num_tuples) + { + fsstate->result_ready = fsstate->eof_reached; return ExecClearTuple(slot); + } } /* * Return the next tuple. */ + fsstate->result_ready = true; ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++], slot, InvalidBuffer, @@ -1424,7 +1581,7 @@ postgresIterateForeignScan(ForeignScanState *node) static void postgresReScanForeignScan(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); char sql[64]; PGresult *res; @@ -1432,6 +1589,9 @@ postgresReScanForeignScan(ForeignScanState *node) if (!fsstate->cursor_exists) return; + /* Absorb the ramining result */ + absorb_current_result(node); + /* * If any internal parameters affecting this node have changed, we'd * better destroy and recreate the cursor. Otherwise, rewinding it should @@ -1460,9 +1620,9 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fsstate->conn, sql); + res = pgfdw_exec_query(fsstate->s.conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); + pgfdw_report_error(ERROR, res, fsstate->s.conn, true, sql); PQclear(res); /* Now force a fresh FETCH. */ @@ -1480,7 +1640,7 @@ postgresReScanForeignScan(ForeignScanState *node) static void postgresEndForeignScan(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ if (fsstate == NULL) @@ -1488,16 +1648,32 @@ postgresEndForeignScan(ForeignScanState *node) /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) - close_cursor(fsstate->conn, fsstate->cursor_number); + close_cursor(fsstate->s.conn, fsstate->cursor_number); /* Release remote connection */ - ReleaseConnection(fsstate->conn); - fsstate->conn = NULL; + ReleaseConnection(fsstate->s.conn); + fsstate->s.conn = NULL; /* MemoryContexts will be deleted automatically. */ } /* + * postgresShutdownForeignScan + * Remove asynchrony stuff and cleanup garbage on the connection. + */ +static void +postgresShutdownForeignScan(ForeignScanState *node) +{ + ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; + + if (plan->operation != CMD_SELECT) + return; + + /* Absorb the ramining result */ + absorb_current_result(node); +} + +/* * postgresAddForeignUpdateTargets * Add resjunk column(s) needed for update/delete on a foreign table */ @@ -1699,7 +1875,9 @@ postgresBeginForeignModify(ModifyTableState *mtstate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->s.conn = GetConnection(user, true); + fmstate->s.connpriv = (PgFdwConnpriv *) + GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv)); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Deconstruct fdw_private data. */ @@ -1778,6 +1956,8 @@ postgresExecForeignInsert(EState *estate, PGresult *res; int n_rows; + vacate_connection((PgFdwState *)fmstate); + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); @@ -1788,14 +1968,14 @@ postgresExecForeignInsert(EState *estate, /* * Execute the prepared statement. */ - if (!PQsendQueryPrepared(fmstate->conn, + if (!PQsendQueryPrepared(fmstate->s.conn, fmstate->p_name, fmstate->p_nums, p_values, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query); /* * Get the result, and check for success. @@ -1803,10 +1983,10 @@ postgresExecForeignInsert(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->s.conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query); /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) @@ -1844,6 +2024,8 @@ postgresExecForeignUpdate(EState *estate, PGresult *res; int n_rows; + vacate_connection((PgFdwState *)fmstate); + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); @@ -1864,14 +2046,14 @@ postgresExecForeignUpdate(EState *estate, /* * Execute the prepared statement. */ - if (!PQsendQueryPrepared(fmstate->conn, + if (!PQsendQueryPrepared(fmstate->s.conn, fmstate->p_name, fmstate->p_nums, p_values, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query); /* * Get the result, and check for success. @@ -1879,10 +2061,10 @@ postgresExecForeignUpdate(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->s.conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query); /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) @@ -1920,6 +2102,8 @@ postgresExecForeignDelete(EState *estate, PGresult *res; int n_rows; + vacate_connection((PgFdwState *)fmstate); + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); @@ -1940,14 +2124,14 @@ postgresExecForeignDelete(EState *estate, /* * Execute the prepared statement. */ - if (!PQsendQueryPrepared(fmstate->conn, + if (!PQsendQueryPrepared(fmstate->s.conn, fmstate->p_name, fmstate->p_nums, p_values, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query); /* * Get the result, and check for success. @@ -1955,10 +2139,10 @@ postgresExecForeignDelete(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->s.conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query); /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) @@ -2005,16 +2189,16 @@ postgresEndForeignModify(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fmstate->conn, sql); + res = pgfdw_exec_query(fmstate->s.conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, sql); PQclear(res); fmstate->p_name = NULL; } /* Release remote connection */ - ReleaseConnection(fmstate->conn); - fmstate->conn = NULL; + ReleaseConnection(fmstate->s.conn); + fmstate->s.conn = NULL; } /* @@ -2302,7 +2486,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->s.conn = GetConnection(user, false); + dmstate->s.connpriv = (PgFdwConnpriv *) + GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv)); /* Initialize state variable */ dmstate->num_tuples = -1; /* -1 means not set yet */ @@ -2355,7 +2541,10 @@ postgresIterateDirectModify(ForeignScanState *node) * If this is the first call after Begin, execute the statement. */ if (dmstate->num_tuples == -1) + { + vacate_connection((PgFdwState *)dmstate); execute_dml_stmt(node); + } /* * If the local query doesn't specify RETURNING, just clear tuple slot. @@ -2402,8 +2591,8 @@ postgresEndDirectModify(ForeignScanState *node) PQclear(dmstate->result); /* Release remote connection */ - ReleaseConnection(dmstate->conn); - dmstate->conn = NULL; + ReleaseConnection(dmstate->s.conn); + dmstate->s.conn = NULL; /* MemoryContext will be deleted automatically. */ } @@ -2522,6 +2711,7 @@ estimate_path_cost_size(PlannerInfo *root, List *local_param_join_conds; StringInfoData sql; PGconn *conn; + PgFdwConnpriv *connpriv; Selectivity local_sel; QualCost local_cost; List *fdw_scan_tlist = NIL; @@ -2564,6 +2754,16 @@ estimate_path_cost_size(PlannerInfo *root, /* Get the remote estimate */ conn = GetConnection(fpinfo->user, false); + connpriv = GetConnectionSpecificStorage(fpinfo->user, + sizeof(PgFdwConnpriv)); + if (connpriv) + { + PgFdwState tmpstate; + tmpstate.conn = conn; + tmpstate.connpriv = connpriv; + vacate_connection(&tmpstate); + } + get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -2918,11 +3118,11 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, static void create_cursor(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); ExprContext *econtext = node->ss.ps.ps_ExprContext; int numParams = fsstate->numParams; const char **values = fsstate->param_values; - PGconn *conn = fsstate->conn; + PGconn *conn = fsstate->s.conn; StringInfoData buf; PGresult *res; @@ -2988,47 +3188,96 @@ create_cursor(ForeignScanState *node) * Fetch some more rows from the node's cursor. */ static void -fetch_more_data(ForeignScanState *node) +request_more_data(ForeignScanState *node) +{ + PgFdwScanState *fsstate = GetPgFdwScanState(node); + PGconn *conn = fsstate->s.conn; + char sql[64]; + + /* The connection should be vacant */ + Assert(fsstate->s.connpriv->current_owner == NULL); + + /* + * If this is the first call after Begin or ReScan, we need to create the + * cursor on the remote side. + */ + if (!fsstate->cursor_exists) + create_cursor(node); + + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + if (!PQsendQuery(conn, sql)) + pgfdw_report_error(ERROR, NULL, conn, false, sql); + + fsstate->s.connpriv->current_owner = node; +} + +/* + * Fetch some more rows from the node's cursor. + */ +static void +fetch_received_data(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); PGresult *volatile res = NULL; MemoryContext oldcontext; + /* I should be the current connection owner */ + Assert(fsstate->s.connpriv->current_owner == node); + /* * We'll store the tuples in the batch_cxt. First, flush the previous - * batch. + * batch if no tuple is remaining */ - fsstate->tuples = NULL; - MemoryContextReset(fsstate->batch_cxt); + if (fsstate->next_tuple >= fsstate->num_tuples) + { + fsstate->tuples = NULL; + fsstate->num_tuples = 0; + MemoryContextReset(fsstate->batch_cxt); + } + else if (fsstate->next_tuple > 0) + { + /* move the remaining tuples to the beginning of the store */ + int n = 0; + + while(fsstate->next_tuple < fsstate->num_tuples) + fsstate->tuples[n++] = fsstate->tuples[fsstate->next_tuple++]; + fsstate->num_tuples = n; + } + oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); /* PGresult must be released before leaving this function. */ PG_TRY(); { - PGconn *conn = fsstate->conn; + PGconn *conn = fsstate->s.conn; char sql[64]; - int numrows; + int addrows; + size_t newsize; int i; snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", fsstate->fetch_size, fsstate->cursor_number); - res = pgfdw_exec_query(conn, sql); + res = pgfdw_get_result(conn, sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); /* Convert the data into HeapTuples */ - numrows = PQntuples(res); - fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); - fsstate->num_tuples = numrows; - fsstate->next_tuple = 0; + addrows = PQntuples(res); + newsize = (fsstate->num_tuples + addrows) * sizeof(HeapTuple); + if (fsstate->tuples) + fsstate->tuples = (HeapTuple *) repalloc(fsstate->tuples, newsize); + else + fsstate->tuples = (HeapTuple *) palloc(newsize); - for (i = 0; i < numrows; i++) + for (i = 0; i < addrows; i++) { Assert(IsA(node->ss.ps.plan, ForeignScan)); - fsstate->tuples[i] = + fsstate->tuples[fsstate->num_tuples + i] = make_tuple_from_result_row(res, i, fsstate->rel, fsstate->attinmeta, @@ -3038,27 +3287,82 @@ fetch_more_data(ForeignScanState *node) } /* Update fetch_ct_2 */ - if (fsstate->fetch_ct_2 < 2) + if (fsstate->fetch_ct_2 < 2 && fsstate->next_tuple == 0) fsstate->fetch_ct_2++; + fsstate->next_tuple = 0; + fsstate->num_tuples += addrows; + /* Must be EOF if we didn't get as many tuples as we asked for. */ - fsstate->eof_reached = (numrows < fsstate->fetch_size); + fsstate->eof_reached = (addrows < fsstate->fetch_size); PQclear(res); res = NULL; } PG_CATCH(); { + fsstate->s.connpriv->current_owner = NULL; if (res) PQclear(res); PG_RE_THROW(); } PG_END_TRY(); + fsstate->s.connpriv->current_owner = NULL; + MemoryContextSwitchTo(oldcontext); } /* + * Vacate a connection so that this node can send the next query + */ +static void +vacate_connection(PgFdwState *fdwstate) +{ + PgFdwConnpriv *connpriv = fdwstate->connpriv; + ForeignScanState *owner; + + if (connpriv == NULL || connpriv->current_owner == NULL) + return; + + /* + * let the current connection owner read the result for the running query + */ + owner = connpriv->current_owner; + fetch_received_data(owner); + + /* Clear the waiting list */ + while (owner) + { + PgFdwScanState *fsstate = GetPgFdwScanState(owner); + + fsstate->last_waiter = NULL; + owner = fsstate->waiter; + fsstate->waiter = NULL; + } +} + +/* + * Absorb the result of the current query. + */ +static void +absorb_current_result(ForeignScanState *node) +{ + PgFdwScanState *fsstate = GetPgFdwScanState(node); + ForeignScanState *owner = fsstate->s.connpriv->current_owner; + + if (owner) + { + PgFdwScanState *target_state = GetPgFdwScanState(owner); + PGconn *conn = target_state->s.conn; + + while(PQisBusy(conn)) + PQclear(PQgetResult(conn)); + fsstate->s.connpriv->current_owner = NULL; + fsstate->async_waiting = false; + } +} +/* * Force assorted GUC parameters to settings that ensure that we'll output * data values in a form that is unambiguous to the remote server. * @@ -3142,7 +3446,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) /* Construct name we'll use for the prepared statement. */ snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", - GetPrepStmtNumber(fmstate->conn)); + GetPrepStmtNumber(fmstate->s.conn)); p_name = pstrdup(prep_name); /* @@ -3152,12 +3456,12 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * the prepared statements we use in this module are simple enough that * the remote server will make the right choices. */ - if (!PQsendPrepare(fmstate->conn, + if (!PQsendPrepare(fmstate->s.conn, p_name, fmstate->query, 0, NULL)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query); /* * Get the result, and check for success. @@ -3165,9 +3469,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->s.conn, fmstate->query); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query); PQclear(res); /* This action shows that the prepare has been done. */ @@ -3298,9 +3602,9 @@ execute_dml_stmt(ForeignScanState *node) * the desired result. This allows us to avoid assuming that the remote * server has the same OIDs we do for the parameters' types. */ - if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams, + if (!PQsendQueryParams(dmstate->s.conn, dmstate->query, numParams, NULL, values, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query); + pgfdw_report_error(ERROR, NULL, dmstate->s.conn, false, dmstate->query); /* * Get the result, and check for success. @@ -3308,10 +3612,10 @@ execute_dml_stmt(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query); + dmstate->result = pgfdw_get_result(dmstate->s.conn, dmstate->query); if (PQresultStatus(dmstate->result) != (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true, + pgfdw_report_error(ERROR, dmstate->result, dmstate->s.conn, true, dmstate->query); /* Get the number of rows affected. */ @@ -4582,6 +4886,80 @@ postgresGetForeignJoinPaths(PlannerInfo *root, /* XXX Consider parameterized paths for the join relation */ } +static bool +postgresIsForeignPathAsyncCapable(ForeignPath *path) +{ + return true; +} + +/* + * Accept async request. Notify to the caller if the next tuple is immediately + * available. ExecForeignScan does additional work to finishing the returning + * tuple, so call it instead of postgresIterateForeignScan to acquire a tuple + * in expected shape. + */ +static void +postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + TupleTableSlot *slot; + + Assert(IsA(node, ForeignScanState)); + GetPgFdwScanState(node)->run_async = true; + slot = ExecForeignScan(node); + if (GetPgFdwScanState(node)->result_ready) + ExecAsyncRequestDone(estate, areq, (Node *) slot); + else + ExecAsyncSetRequiredEvents(estate, areq, 1, false, false); +} + +/* + * Configure waiting event. + * + * Add an wait event only when the node is the connection owner. Elsewise + * another node on this connection is the owner. + */ +static bool +postgresForeignAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq, + bool reinit) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = GetPgFdwScanState(node); + + + /* If the caller didn't reinit, this event is already in event set */ + if (!reinit) + return true; + + if (fsstate->s.connpriv->current_owner == node) + { + AddWaitEventToSet(estate->es_wait_event_set, + WL_SOCKET_READABLE, PQsocket(fsstate->s.conn), + NULL, areq); + return true; + } + + return false; +} + +/* + * Process a notification from async mechanism. ExecForeignScan does + * additional work to complete the returning tuple, so call it instead of + * postgresIterateForeignScan to acquire a completed tuple. + */ +static void +postgresForeignAsyncNotify(EState *estate, PendingAsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + TupleTableSlot *slot; + + Assert(IsA(node, ForeignScanState)); + slot = ExecForeignScan(node); + Assert(GetPgFdwScanState(node)->result_ready); + + ExecAsyncRequestDone(estate, areq, (Node *) slot); +} + /* * Assess whether the aggregation, grouping and having operations can be pushed * down to the foreign server. As a side effect, save information we obtain in @@ -4946,7 +5324,7 @@ make_tuple_from_result_row(PGresult *res, PgFdwScanState *fdw_sstate; Assert(fsstate); - fdw_sstate = (PgFdwScanState *) fsstate->fdw_state; + fdw_sstate = GetPgFdwScanState(fsstate); tupdesc = fdw_sstate->tupdesc; } diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index f396dae..a67da3d 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -77,6 +77,7 @@ typedef struct PgFdwRelationInfo UserMapping *user; /* only set in use_remote_estimate mode */ int fetch_size; /* fetch size for this remote table */ + bool allow_prefetch; /* true to allow overlapped fetching */ /* * Name of the relation while EXPLAINing ForeignScan. It is used for join @@ -116,6 +117,7 @@ extern void reset_transmission_modes(int nestlevel); /* in connection.c */ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); +void *GetConnectionSpecificStorage(UserMapping *user, size_t initsize); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 509bb54..1f69908 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -1542,12 +1542,12 @@ insert into bar2 values(4,44,44); insert into bar2 values(7,77,77); explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for update; -select * from bar where f1 in (select f1 from foo) for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for share; -select * from bar where f1 in (select f1 from foo) for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; -- Check UPDATE with inherited target and an inherited source table explain (verbose, costs off) @@ -1606,8 +1606,8 @@ explain (verbose, costs off) delete from foo where f1 < 5 returning *; delete from foo where f1 < 5 returning *; explain (verbose, costs off) -update bar set f2 = f2 + 100 returning *; -update bar set f2 = f2 + 100 returning *; +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; drop table foo cascade; drop table bar cascade; -- 2.9.2