From 28025ab53215ce0cbbfc690bf053e600afa9fb4d Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi Date: Mon, 17 Oct 2016 16:00:56 +0900 Subject: [PATCH 4/7] Make postgres_fdw async-capable --- contrib/postgres_fdw/connection.c | 79 ++-- contrib/postgres_fdw/expected/postgres_fdw.out | 64 ++-- contrib/postgres_fdw/postgres_fdw.c | 483 +++++++++++++++++++++---- contrib/postgres_fdw/postgres_fdw.h | 2 + contrib/postgres_fdw/sql/postgres_fdw.sql | 4 +- src/backend/executor/execProcnode.c | 9 + src/include/foreign/fdwapi.h | 2 + 7 files changed, 510 insertions(+), 133 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 7f7a744..64cc057 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -49,6 +49,7 @@ typedef struct ConnCacheEntry * one level of subxact open, etc */ bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool have_error; /* have any subxacts aborted in this xact? */ + void *storage; /* connection specific storage */ } ConnCacheEntry; /* @@ -64,6 +65,7 @@ static unsigned int prep_stmt_number = 0; static bool xact_got_connection = false; /* prototypes of private functions */ +static ConnCacheEntry *get_connection_entry(Oid umid); static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static void check_conn_params(const char **keywords, const char **values); static void configure_remote_session(PGconn *conn); @@ -75,26 +77,12 @@ static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId parentSubid, void *arg); - /* - * Get a PGconn which can be used to execute queries on the remote PostgreSQL - * server with the user's authorization. A new connection is established - * if we don't already have a suitable one, and a transaction is opened at - * the right subtransaction nesting depth if we didn't do that already. - * - * will_prep_stmt must be true if caller intends to create any prepared - * statements. Since those don't go away automatically at transaction end - * (not even on error), we need this flag to cue manual cleanup. - * - * XXX Note that caching connections theoretically requires a mechanism to - * detect change of FDW objects to invalidate already established connections. - * We could manage that by watching for invalidation events on the relevant - * syscaches. For the moment, though, it's not clear that this would really - * be useful and not mere pedantry. We could not flush any active connections - * mid-transaction anyway. + * Common function to acquire or create a connection cache entry. */ -PGconn * -GetConnection(UserMapping *user, bool will_prep_stmt) + +static ConnCacheEntry * +get_connection_entry(Oid umid) { bool found; ConnCacheEntry *entry; @@ -122,11 +110,8 @@ GetConnection(UserMapping *user, bool will_prep_stmt) RegisterSubXactCallback(pgfdw_subxact_callback, NULL); } - /* Set flag that we did GetConnection during the current transaction */ - xact_got_connection = true; - /* Create hash key for the entry. Assume no pad bytes in key struct */ - key = user->umid; + key = umid; /* * Find or create cached entry for requested connection. @@ -139,8 +124,39 @@ GetConnection(UserMapping *user, bool will_prep_stmt) entry->xact_depth = 0; entry->have_prep_stmt = false; entry->have_error = false; + entry->storage = NULL; } + return entry; +} + +/* + * Get a PGconn which can be used to execute queries on the remote PostgreSQL + * server with the user's authorization. A new connection is established + * if we don't already have a suitable one, and a transaction is opened at + * the right subtransaction nesting depth if we didn't do that already. + * + * will_prep_stmt must be true if caller intends to create any prepared + * statements. Since those don't go away automatically at transaction end + * (not even on error), we need this flag to cue manual cleanup. + * + * XXX Note that caching connections theoretically requires a mechanism to + * detect change of FDW objects to invalidate already established connections. + * We could manage that by watching for invalidation events on the relevant + * syscaches. For the moment, though, it's not clear that this would really + * be useful and not mere pedantry. We could not flush any active connections + * mid-transaction anyway. + */ +PGconn * +GetConnection(UserMapping *user, bool will_prep_stmt) +{ + ConnCacheEntry *entry; + + /* Set flag that we did GetConnection during the current transaction */ + xact_got_connection = true; + + entry = get_connection_entry(user->umid); + /* * We don't check the health of cached connection here, because it would * require some overhead. Broken connection will be detected when the @@ -177,6 +193,25 @@ GetConnection(UserMapping *user, bool will_prep_stmt) } /* + * Rerturns the connection specific storage for this user. Allocate with + * initsize if not exists. + */ +void * +GetConnectionSpecificStorage(UserMapping *user, size_t initsize) +{ + ConnCacheEntry *entry; + + entry = get_connection_entry(user->umid); + if (entry->storage == NULL) + { + entry->storage = MemoryContextAlloc(CacheMemoryContext, initsize); + memset(entry->storage, 0, initsize); + } + + return entry->storage; +} + +/* * Connect to remote server using specified server and user mapping properties. */ static PGconn * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index fd8b628..5d448d1 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -6254,12 +6254,12 @@ INSERT INTO b(aa) VALUES('bbbbb'); SELECT tableoid::regclass, * FROM a; tableoid | aa ----------+------- - b | bbb - b | bbbb - b | bbbbb a | aaa a | aaaa a | aaaaa + b | bbb + b | bbbb + b | bbbbb (6 rows) SELECT tableoid::regclass, * FROM b; @@ -6282,12 +6282,12 @@ UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%'; SELECT tableoid::regclass, * FROM a; tableoid | aa ----------+-------- - b | bbb - b | bbbb - b | bbbbb a | aaa a | zzzzzz a | zzzzzz + b | bbb + b | bbbb + b | bbbbb (6 rows) SELECT tableoid::regclass, * FROM b; @@ -6310,12 +6310,12 @@ UPDATE b SET aa = 'new'; SELECT tableoid::regclass, * FROM a; tableoid | aa ----------+-------- - b | new - b | new - b | new a | aaa a | zzzzzz a | zzzzzz + b | new + b | new + b | new (6 rows) SELECT tableoid::regclass, * FROM b; @@ -6338,12 +6338,12 @@ UPDATE a SET aa = 'newtoo'; SELECT tableoid::regclass, * FROM a; tableoid | aa ----------+-------- - b | newtoo - b | newtoo - b | newtoo a | newtoo a | newtoo a | newtoo + b | newtoo + b | newtoo + b | newtoo (6 rows) SELECT tableoid::regclass, * FROM b; @@ -6431,9 +6431,9 @@ select * from bar where f1 in (select f1 from foo) for update; select * from bar where f1 in (select f1 from foo) for update; f1 | f2 ----+---- + 1 | 11 3 | 33 4 | 44 - 1 | 11 2 | 22 (4 rows) @@ -6468,9 +6468,9 @@ select * from bar where f1 in (select f1 from foo) for share; select * from bar where f1 in (select f1 from foo) for share; f1 | f2 ----+---- + 1 | 11 3 | 33 4 | 44 - 1 | 11 2 | 22 (4 rows) @@ -6733,27 +6733,33 @@ delete from foo where f1 < 5 returning *; (5 rows) explain (verbose, costs off) -update bar set f2 = f2 + 100 returning *; - QUERY PLAN ------------------------------------------------------------------------------- - Update on public.bar - Output: bar.f1, bar.f2 - Update on public.bar - Foreign Update on public.bar2 - -> Seq Scan on public.bar - Output: bar.f1, (bar.f2 + 100), bar.ctid - -> Foreign Update on public.bar2 - Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2 -(8 rows) +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; + QUERY PLAN +-------------------------------------------------------------------------------------- + Sort + Output: u.f1, u.f2 + Sort Key: u.f1 + CTE u + -> Update on public.bar + Output: bar.f1, bar.f2 + Update on public.bar + Foreign Update on public.bar2 + -> Seq Scan on public.bar + Output: bar.f1, (bar.f2 + 100), bar.ctid + -> Foreign Update on public.bar2 + Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2 + -> CTE Scan on u + Output: u.f1, u.f2 +(14 rows) -update bar set f2 = f2 + 100 returning *; +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; f1 | f2 ----+----- - 2 | 322 1 | 311 - 6 | 266 + 2 | 322 3 | 333 4 | 344 + 6 | 266 7 | 277 (6 rows) diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index abb256b..a52d54a 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -35,6 +35,7 @@ #include "optimizer/var.h" #include "optimizer/tlist.h" #include "parser/parsetree.h" +#include "pgstat.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/lsyscache.h" @@ -54,6 +55,9 @@ PG_MODULE_MAGIC; /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +/* Retrive PgFdwScanState struct from ForeginScanState */ +#define GetPgFdwScanState(n) ((PgFdwScanState *)(n)->fdw_state) + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -123,10 +127,27 @@ enum FdwDirectModifyPrivateIndex }; /* + * Connection private area structure. + */ + typedef struct PgFdwConnspecate +{ + ForeignScanState *current_owner; /* The node currently running a query + * on this connection*/ +} PgFdwConnspecate; + +/* Execution state base type */ +typedef struct PgFdwState +{ + PGconn *conn; /* connection for the scan */ + PgFdwConnspecate *connspec; /* connection private memory */ +} PgFdwState; + +/* * Execution state of a foreign scan using postgres_fdw. */ typedef struct PgFdwScanState { + PgFdwState s; /* common structure */ Relation rel; /* relcache entry for the foreign table. NULL * for a foreign join scan. */ TupleDesc tupdesc; /* tuple descriptor of scan */ @@ -137,7 +158,7 @@ typedef struct PgFdwScanState List *retrieved_attrs; /* list of retrieved attribute numbers */ /* for remote query execution */ - PGconn *conn; /* connection for the scan */ + bool result_ready; unsigned int cursor_number; /* quasi-unique ID for my cursor */ bool cursor_exists; /* have we created the cursor? */ int numParams; /* number of parameters passed to query */ @@ -153,6 +174,13 @@ typedef struct PgFdwScanState /* batch-level state, for optimizing rewinds and avoiding useless fetch */ int fetch_ct_2; /* Min(# of fetches done, 2) */ bool eof_reached; /* true if last fetch reached EOF */ + bool run_async; /* true if run asynchronously */ + bool async_waiting; /* true if requesting the parent to wait */ + ForeignScanState *waiter; /* Next node to run a query among nodes + * sharing the same connection */ + ForeignScanState *last_waiter; /* A waiting node at the end of a waiting + * list. Maintained only by the current + * owner of the connection */ /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ @@ -166,11 +194,11 @@ typedef struct PgFdwScanState */ typedef struct PgFdwModifyState { + PgFdwState s; /* common structure */ Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* for remote query execution */ - PGconn *conn; /* connection for the scan */ char *p_name; /* name of prepared statement, if created */ /* extracted fdw_private data */ @@ -193,6 +221,7 @@ typedef struct PgFdwModifyState */ typedef struct PgFdwDirectModifyState { + PgFdwState s; /* common structure */ Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ @@ -291,6 +320,7 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags); static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node); static void postgresReScanForeignScan(ForeignScanState *node); static void postgresEndForeignScan(ForeignScanState *node); +static void postgresShutdownForeignScan(ForeignScanState *node); static void postgresAddForeignUpdateTargets(Query *parsetree, RangeTblEntry *target_rte, Relation target_relation); @@ -355,8 +385,8 @@ static bool postgresIsForeignPathAsyncCapable(ForeignPath *path); static void postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq); static bool postgresForeignAsyncConfigureWait(EState *estate, - PendingAsyncRequest *areq, - bool reinit); + PendingAsyncRequest *areq, + bool reinit); static void postgresForeignAsyncNotify(EState *estate, PendingAsyncRequest *areq); @@ -379,7 +409,10 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg); static void create_cursor(ForeignScanState *node); -static void fetch_more_data(ForeignScanState *node); +static void request_more_data(ForeignScanState *node); +static void fetch_received_data(ForeignScanState *node); +static void vacate_connection(PgFdwState *fdwconn); +static void absorb_current_result(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, @@ -444,6 +477,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->IterateForeignScan = postgresIterateForeignScan; routine->ReScanForeignScan = postgresReScanForeignScan; routine->EndForeignScan = postgresEndForeignScan; + routine->ShutdownForeignScan = postgresShutdownForeignScan; /* Functions for updating foreign tables */ routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets; @@ -1335,12 +1369,21 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->s.conn = GetConnection(user, false); + fsstate->s.connspec = (PgFdwConnspecate *) + GetConnectionSpecificStorage(user, sizeof(PgFdwConnspecate)); + fsstate->s.connspec->current_owner = NULL; + fsstate->waiter = NULL; + fsstate->last_waiter = node; /* Assign a unique ID for my cursor */ - fsstate->cursor_number = GetCursorNumber(fsstate->conn); + fsstate->cursor_number = GetCursorNumber(fsstate->s.conn); fsstate->cursor_exists = false; + /* Initialize async execution status */ + fsstate->run_async = false; + fsstate->async_waiting = false; + /* Get private info created by planner functions. */ fsstate->query = strVal(list_nth(fsplan->fdw_private, FdwScanPrivateSelectSql)); @@ -1396,32 +1439,126 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) static TupleTableSlot * postgresIterateForeignScan(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; /* - * If this is the first call after Begin or ReScan, we need to create the - * cursor on the remote side. - */ - if (!fsstate->cursor_exists) - create_cursor(node); - - /* * Get some more tuples, if we've run out. */ if (fsstate->next_tuple >= fsstate->num_tuples) { - /* No point in another fetch if we already detected EOF, though. */ - if (!fsstate->eof_reached) - fetch_more_data(node); - /* If we didn't get any tuples, must be end of data. */ + ForeignScanState *next_conn_owner = node; + + /* This node has sent a query on this connection */ + if (fsstate->s.connspec->current_owner == node) + { + /* Check if the result is available */ + if (PQisBusy(fsstate->s.conn)) + { + int rc = WaitLatchOrSocket(NULL, + WL_SOCKET_READABLE | WL_TIMEOUT, + PQsocket(fsstate->s.conn), 0, + WAIT_EVENT_ASYNC_WAIT); + if (fsstate->run_async && !(rc & WL_SOCKET_READABLE)) + { + /* + * This node is not ready yet. Tell the caller to wait. + */ + fsstate->result_ready = false; + return ExecClearTuple(slot); + } + } + + Assert(fsstate->async_waiting); + fsstate->async_waiting = false; + fetch_received_data(node); + + /* + * If someone is waiting this node on the same connection, let the + * first waiter be the next owner of this connection. + */ + if (fsstate->waiter) + { + PgFdwScanState *next_owner_state; + + next_conn_owner = fsstate->waiter; + next_owner_state = GetPgFdwScanState(next_conn_owner); + fsstate->waiter = NULL; + + /* + * only the current owner is responsible to maintain the shortcut + * to the last waiter + */ + next_owner_state->last_waiter = fsstate->last_waiter; + + /* + * for simplicity, last_waiter points itself on a node that no one + * is waiting for. + */ + fsstate->last_waiter = node; + } + } + else if (fsstate->s.connspec->current_owner) + { + /* + * Anyone else is holding this connection. Add myself to the tail + * of the waiters' list then return not-ready. To avoid scanning + * through the waiters' list, the current owner is to maintain the + * shortcut to the last waiter. + */ + PgFdwScanState *conn_owner_state = + GetPgFdwScanState(fsstate->s.connspec->current_owner); + ForeignScanState *last_waiter = conn_owner_state->last_waiter; + PgFdwScanState *last_waiter_state = GetPgFdwScanState(last_waiter); + + last_waiter_state->waiter = node; + conn_owner_state->last_waiter = node; + + /* Register the node to the async-waiting node list */ + Assert(!GetPgFdwScanState(node)->async_waiting); + + GetPgFdwScanState(node)->async_waiting = true; + + fsstate->result_ready = fsstate->eof_reached; + return ExecClearTuple(slot); + } + + /* + * Send the next request for the next owner of this connection if + * needed. + */ + + if (!GetPgFdwScanState(next_conn_owner)->eof_reached) + { + PgFdwScanState *next_owner_state = + GetPgFdwScanState(next_conn_owner); + + request_more_data(next_conn_owner); + + /* Register the node to the async-waiting node list */ + if (!next_owner_state->async_waiting) + next_owner_state->async_waiting = true; + + if (!next_owner_state->run_async) + fetch_received_data(next_conn_owner); + } + + + /* + * If we haven't received a result for the given node this time, + * return with no tuple to give way to other nodes. + */ if (fsstate->next_tuple >= fsstate->num_tuples) + { + fsstate->result_ready = fsstate->eof_reached; return ExecClearTuple(slot); + } } /* * Return the next tuple. */ + fsstate->result_ready = true; ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++], slot, InvalidBuffer, @@ -1437,7 +1574,7 @@ postgresIterateForeignScan(ForeignScanState *node) static void postgresReScanForeignScan(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); char sql[64]; PGresult *res; @@ -1445,6 +1582,9 @@ postgresReScanForeignScan(ForeignScanState *node) if (!fsstate->cursor_exists) return; + /* Absorb the ramining result */ + absorb_current_result(node); + /* * If any internal parameters affecting this node have changed, we'd * better destroy and recreate the cursor. Otherwise, rewinding it should @@ -1473,9 +1613,9 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fsstate->conn, sql); + res = pgfdw_exec_query(fsstate->s.conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); + pgfdw_report_error(ERROR, res, fsstate->s.conn, true, sql); PQclear(res); /* Now force a fresh FETCH. */ @@ -1493,7 +1633,7 @@ postgresReScanForeignScan(ForeignScanState *node) static void postgresEndForeignScan(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ if (fsstate == NULL) @@ -1501,16 +1641,32 @@ postgresEndForeignScan(ForeignScanState *node) /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) - close_cursor(fsstate->conn, fsstate->cursor_number); + close_cursor(fsstate->s.conn, fsstate->cursor_number); /* Release remote connection */ - ReleaseConnection(fsstate->conn); - fsstate->conn = NULL; + ReleaseConnection(fsstate->s.conn); + fsstate->s.conn = NULL; /* MemoryContexts will be deleted automatically. */ } /* + * postgresShutdownForeignScan + * Remove asynchrony stuff and cleanup garbage on the connection. + */ +static void +postgresShutdownForeignScan(ForeignScanState *node) +{ + ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; + + if (plan->operation != CMD_SELECT) + return; + + /* Absorb the ramining result */ + absorb_current_result(node); +} + +/* * postgresAddForeignUpdateTargets * Add resjunk column(s) needed for update/delete on a foreign table */ @@ -1712,7 +1868,9 @@ postgresBeginForeignModify(ModifyTableState *mtstate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->s.conn = GetConnection(user, true); + fmstate->s.connspec = (PgFdwConnspecate *) + GetConnectionSpecificStorage(user, sizeof(PgFdwConnspecate)); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Deconstruct fdw_private data. */ @@ -1791,6 +1949,8 @@ postgresExecForeignInsert(EState *estate, PGresult *res; int n_rows; + vacate_connection((PgFdwState *)fmstate); + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); @@ -1801,14 +1961,14 @@ postgresExecForeignInsert(EState *estate, /* * Execute the prepared statement. */ - if (!PQsendQueryPrepared(fmstate->conn, + if (!PQsendQueryPrepared(fmstate->s.conn, fmstate->p_name, fmstate->p_nums, p_values, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query); /* * Get the result, and check for success. @@ -1816,10 +1976,10 @@ postgresExecForeignInsert(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->s.conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query); /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) @@ -1857,6 +2017,8 @@ postgresExecForeignUpdate(EState *estate, PGresult *res; int n_rows; + vacate_connection((PgFdwState *)fmstate); + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); @@ -1877,14 +2039,14 @@ postgresExecForeignUpdate(EState *estate, /* * Execute the prepared statement. */ - if (!PQsendQueryPrepared(fmstate->conn, + if (!PQsendQueryPrepared(fmstate->s.conn, fmstate->p_name, fmstate->p_nums, p_values, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query); /* * Get the result, and check for success. @@ -1892,10 +2054,10 @@ postgresExecForeignUpdate(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->s.conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query); /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) @@ -1933,6 +2095,8 @@ postgresExecForeignDelete(EState *estate, PGresult *res; int n_rows; + vacate_connection((PgFdwState *)fmstate); + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); @@ -1953,14 +2117,14 @@ postgresExecForeignDelete(EState *estate, /* * Execute the prepared statement. */ - if (!PQsendQueryPrepared(fmstate->conn, + if (!PQsendQueryPrepared(fmstate->s.conn, fmstate->p_name, fmstate->p_nums, p_values, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query); /* * Get the result, and check for success. @@ -1968,10 +2132,10 @@ postgresExecForeignDelete(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->s.conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query); /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) @@ -2018,16 +2182,16 @@ postgresEndForeignModify(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fmstate->conn, sql); + res = pgfdw_exec_query(fmstate->s.conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, sql); PQclear(res); fmstate->p_name = NULL; } /* Release remote connection */ - ReleaseConnection(fmstate->conn); - fmstate->conn = NULL; + ReleaseConnection(fmstate->s.conn); + fmstate->s.conn = NULL; } /* @@ -2307,7 +2471,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->s.conn = GetConnection(user, false); + dmstate->s.connspec = (PgFdwConnspecate *) + GetConnectionSpecificStorage(user, sizeof(PgFdwConnspecate)); /* Initialize state variable */ dmstate->num_tuples = -1; /* -1 means not set yet */ @@ -2360,7 +2526,10 @@ postgresIterateDirectModify(ForeignScanState *node) * If this is the first call after Begin, execute the statement. */ if (dmstate->num_tuples == -1) + { + vacate_connection((PgFdwState *)dmstate); execute_dml_stmt(node); + } /* * If the local query doesn't specify RETURNING, just clear tuple slot. @@ -2407,8 +2576,8 @@ postgresEndDirectModify(ForeignScanState *node) PQclear(dmstate->result); /* Release remote connection */ - ReleaseConnection(dmstate->conn); - dmstate->conn = NULL; + ReleaseConnection(dmstate->s.conn); + dmstate->s.conn = NULL; /* MemoryContext will be deleted automatically. */ } @@ -2527,6 +2696,7 @@ estimate_path_cost_size(PlannerInfo *root, List *local_param_join_conds; StringInfoData sql; PGconn *conn; + PgFdwConnspecate *connspec; Selectivity local_sel; QualCost local_cost; List *fdw_scan_tlist = NIL; @@ -2570,6 +2740,16 @@ estimate_path_cost_size(PlannerInfo *root, /* Get the remote estimate */ conn = GetConnection(fpinfo->user, false); + connspec = GetConnectionSpecificStorage(fpinfo->user, + sizeof(PgFdwConnspecate)); + if (connspec) + { + PgFdwState tmpstate; + tmpstate.conn = conn; + tmpstate.connspec = connspec; + vacate_connection(&tmpstate); + } + get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -2924,11 +3104,11 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, static void create_cursor(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); ExprContext *econtext = node->ss.ps.ps_ExprContext; int numParams = fsstate->numParams; const char **values = fsstate->param_values; - PGconn *conn = fsstate->conn; + PGconn *conn = fsstate->s.conn; StringInfoData buf; PGresult *res; @@ -2994,47 +3174,96 @@ create_cursor(ForeignScanState *node) * Fetch some more rows from the node's cursor. */ static void -fetch_more_data(ForeignScanState *node) +request_more_data(ForeignScanState *node) +{ + PgFdwScanState *fsstate = GetPgFdwScanState(node); + PGconn *conn = fsstate->s.conn; + char sql[64]; + + /* The connection should be vacant */ + Assert(fsstate->s.connspec->current_owner == NULL); + + /* + * If this is the first call after Begin or ReScan, we need to create the + * cursor on the remote side. + */ + if (!fsstate->cursor_exists) + create_cursor(node); + + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + if (!PQsendQuery(conn, sql)) + pgfdw_report_error(ERROR, NULL, conn, false, sql); + + fsstate->s.connspec->current_owner = node; +} + +/* + * Fetch some more rows from the node's cursor. + */ +static void +fetch_received_data(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); PGresult *volatile res = NULL; MemoryContext oldcontext; + /* I should be the current connection owner */ + Assert(fsstate->s.connspec->current_owner == node); + /* * We'll store the tuples in the batch_cxt. First, flush the previous - * batch. + * batch if no tuple is remaining */ - fsstate->tuples = NULL; - MemoryContextReset(fsstate->batch_cxt); + if (fsstate->next_tuple >= fsstate->num_tuples) + { + fsstate->tuples = NULL; + fsstate->num_tuples = 0; + MemoryContextReset(fsstate->batch_cxt); + } + else if (fsstate->next_tuple > 0) + { + /* move the remaining tuples to the beginning of the store */ + int n = 0; + + while(fsstate->next_tuple < fsstate->num_tuples) + fsstate->tuples[n++] = fsstate->tuples[fsstate->next_tuple++]; + fsstate->num_tuples = n; + } + oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); /* PGresult must be released before leaving this function. */ PG_TRY(); { - PGconn *conn = fsstate->conn; + PGconn *conn = fsstate->s.conn; char sql[64]; - int numrows; + int addrows; + size_t newsize; int i; snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", fsstate->fetch_size, fsstate->cursor_number); - res = pgfdw_exec_query(conn, sql); + res = pgfdw_get_result(conn, sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); /* Convert the data into HeapTuples */ - numrows = PQntuples(res); - fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); - fsstate->num_tuples = numrows; - fsstate->next_tuple = 0; + addrows = PQntuples(res); + newsize = (fsstate->num_tuples + addrows) * sizeof(HeapTuple); + if (fsstate->tuples) + fsstate->tuples = (HeapTuple *) repalloc(fsstate->tuples, newsize); + else + fsstate->tuples = (HeapTuple *) palloc(newsize); - for (i = 0; i < numrows; i++) + for (i = 0; i < addrows; i++) { Assert(IsA(node->ss.ps.plan, ForeignScan)); - fsstate->tuples[i] = + fsstate->tuples[fsstate->num_tuples + i] = make_tuple_from_result_row(res, i, fsstate->rel, fsstate->attinmeta, @@ -3044,27 +3273,82 @@ fetch_more_data(ForeignScanState *node) } /* Update fetch_ct_2 */ - if (fsstate->fetch_ct_2 < 2) + if (fsstate->fetch_ct_2 < 2 && fsstate->next_tuple == 0) fsstate->fetch_ct_2++; + fsstate->next_tuple = 0; + fsstate->num_tuples += addrows; + /* Must be EOF if we didn't get as many tuples as we asked for. */ - fsstate->eof_reached = (numrows < fsstate->fetch_size); + fsstate->eof_reached = (addrows < fsstate->fetch_size); PQclear(res); res = NULL; } PG_CATCH(); { + fsstate->s.connspec->current_owner = NULL; if (res) PQclear(res); PG_RE_THROW(); } PG_END_TRY(); + fsstate->s.connspec->current_owner = NULL; + MemoryContextSwitchTo(oldcontext); } /* + * Vacate a connection so that this node can send the next query + */ +static void +vacate_connection(PgFdwState *fdwstate) +{ + PgFdwConnspecate *connspec = fdwstate->connspec; + ForeignScanState *owner; + + if (connspec == NULL || connspec->current_owner == NULL) + return; + + /* + * let the current connection owner read the result for the running query + */ + owner = connspec->current_owner; + fetch_received_data(owner); + + /* Clear the waiting list */ + while (owner) + { + PgFdwScanState *fsstate = GetPgFdwScanState(owner); + + fsstate->last_waiter = NULL; + owner = fsstate->waiter; + fsstate->waiter = NULL; + } +} + +/* + * Absorb the result of the current query. + */ +static void +absorb_current_result(ForeignScanState *node) +{ + PgFdwScanState *fsstate = GetPgFdwScanState(node); + ForeignScanState *owner = fsstate->s.connspec->current_owner; + + if (owner) + { + PgFdwScanState *target_state = GetPgFdwScanState(owner); + PGconn *conn = target_state->s.conn; + + while(PQisBusy(conn)) + PQclear(PQgetResult(conn)); + fsstate->s.connspec->current_owner = NULL; + fsstate->async_waiting = false; + } +} +/* * Force assorted GUC parameters to settings that ensure that we'll output * data values in a form that is unambiguous to the remote server. * @@ -3148,7 +3432,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) /* Construct name we'll use for the prepared statement. */ snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", - GetPrepStmtNumber(fmstate->conn)); + GetPrepStmtNumber(fmstate->s.conn)); p_name = pstrdup(prep_name); /* @@ -3158,12 +3442,12 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * the prepared statements we use in this module are simple enough that * the remote server will make the right choices. */ - if (!PQsendPrepare(fmstate->conn, + if (!PQsendPrepare(fmstate->s.conn, p_name, fmstate->query, 0, NULL)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query); /* * Get the result, and check for success. @@ -3171,9 +3455,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->s.conn, fmstate->query); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query); PQclear(res); /* This action shows that the prepare has been done. */ @@ -3304,9 +3588,9 @@ execute_dml_stmt(ForeignScanState *node) * the desired result. This allows us to avoid assuming that the remote * server has the same OIDs we do for the parameters' types. */ - if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams, + if (!PQsendQueryParams(dmstate->s.conn, dmstate->query, numParams, NULL, values, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query); + pgfdw_report_error(ERROR, NULL, dmstate->s.conn, false, dmstate->query); /* * Get the result, and check for success. @@ -3314,10 +3598,10 @@ execute_dml_stmt(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query); + dmstate->result = pgfdw_get_result(dmstate->s.conn, dmstate->query); if (PQresultStatus(dmstate->result) != (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true, + pgfdw_report_error(ERROR, dmstate->result, dmstate->s.conn, true, dmstate->query); /* Get the number of rows affected. */ @@ -4463,8 +4747,10 @@ postgresIsForeignPathAsyncCapable(ForeignPath *path) } /* - * XXX. Just for testing purposes, let's run everything through the async - * mechanism but return tuples synchronously. + * Accept async request. Notify to the caller if the next tuple is immediately + * available. ExecForeignScan does additional work to finishing the returning + * tuple, so call it instead of postgresIterateForeignScan to acquire a tuple + * in expected shape. */ static void postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq) @@ -4473,22 +4759,59 @@ postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq) TupleTableSlot *slot; Assert(IsA(node, ForeignScanState)); + GetPgFdwScanState(node)->run_async = true; slot = ExecForeignScan(node); - ExecAsyncRequestDone(estate, areq, (Node *) slot); + if (GetPgFdwScanState(node)->result_ready) + ExecAsyncRequestDone(estate, areq, (Node *) slot); + else + ExecAsyncSetRequiredEvents(estate, areq, 1, false, false); } +/* + * Configure waiting event. + * + * Add an wait event only when the node is the connection owner. Elsewise + * another node on this connection is the owner. + */ static bool postgresForeignAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq, - bool reinit) + bool reinit) { - elog(ERROR, "postgresForeignAsyncConfigureWait"); + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = GetPgFdwScanState(node); + + + /* If the caller didn't reinit, this event is already in event set */ + if (!reinit) + return true; + + if (fsstate->s.connspec->current_owner == node) + { + AddWaitEventToSet(estate->es_wait_event_set, + WL_SOCKET_READABLE, PQsocket(fsstate->s.conn), + NULL, areq); + return true; + } + return false; } +/* + * Process a notification from async mechanism. ExecForeignScan does + * additional work to complete the returning tuple, so call it instead of + * postgresIterateForeignScan to acquire a completed tuple. + */ static void postgresForeignAsyncNotify(EState *estate, PendingAsyncRequest *areq) { - elog(ERROR, "postgresForeignAsyncNotify"); + ForeignScanState *node = (ForeignScanState *) areq->requestee; + TupleTableSlot *slot; + + Assert(IsA(node, ForeignScanState)); + slot = ExecForeignScan(node); + Assert(GetPgFdwScanState(node)->result_ready); + + ExecAsyncRequestDone(estate, areq, (Node *) slot); } /* @@ -4848,7 +5171,7 @@ make_tuple_from_result_row(PGresult *res, PgFdwScanState *fdw_sstate; Assert(fsstate); - fdw_sstate = (PgFdwScanState *) fsstate->fdw_state; + fdw_sstate = GetPgFdwScanState(fsstate); tupdesc = fdw_sstate->tupdesc; } diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 46cac55..b3ac615 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -79,6 +79,7 @@ typedef struct PgFdwRelationInfo UserMapping *user; /* only set in use_remote_estimate mode */ int fetch_size; /* fetch size for this remote table */ + bool allow_prefetch; /* true to allow overlapped fetching */ /* * Name of the relation while EXPLAINing ForeignScan. It is used for join @@ -103,6 +104,7 @@ extern void reset_transmission_modes(int nestlevel); /* in connection.c */ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); +void *GetConnectionSpecificStorage(UserMapping *user, size_t initsize); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index e19a3ef..3ae12bc 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -1575,8 +1575,8 @@ explain (verbose, costs off) delete from foo where f1 < 5 returning *; delete from foo where f1 < 5 returning *; explain (verbose, costs off) -update bar set f2 = f2 + 100 returning *; -update bar set f2 = f2 + 100 returning *; +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; drop table foo cascade; drop table bar cascade; diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 0dd95c6..1cba31e 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -115,6 +115,7 @@ #include "executor/nodeValuesscan.h" #include "executor/nodeWindowAgg.h" #include "executor/nodeWorktablescan.h" +#include "foreign/fdwapi.h" #include "nodes/nodeFuncs.h" #include "miscadmin.h" @@ -820,6 +821,14 @@ ExecShutdownNode(PlanState *node) case T_GatherState: ExecShutdownGather((GatherState *) node); break; + case T_ForeignScanState: + { + ForeignScanState *fsstate = (ForeignScanState *)node; + FdwRoutine *fdwroutine = fsstate->fdwroutine; + if (fdwroutine->ShutdownForeignScan) + fdwroutine->ShutdownForeignScan((ForeignScanState *) node); + } + break; default: break; } diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 41fc76f..11c3434 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -163,6 +163,7 @@ typedef bool (*ForeignAsyncConfigureWait_function) (EState *estate, bool reinit); typedef void (*ForeignAsyncNotify_function) (EState *estate, PendingAsyncRequest *areq); +typedef void (*ShutdownForeignScan_function) (ForeignScanState *node); /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler @@ -239,6 +240,7 @@ typedef struct FdwRoutine ForeignAsyncRequest_function ForeignAsyncRequest; ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait; ForeignAsyncNotify_function ForeignAsyncNotify; + ShutdownForeignScan_function ShutdownForeignScan; } FdwRoutine; -- 2.9.2