From 1cf4c02491e0536eaf38952c9e1c3d32d5455e5d Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Sat, 29 Aug 2020 00:14:36 +0900 Subject: [PATCH v32 02/11] postgres_fdw supports commit and rollback APIs. This commit implements both CommitForeignTransaction and RollbackForeignTransaction APIs in postgres_fdw. Note that since PREPARE TRANSACTION is still not supported this commit doesn't change anything user newly is able to do. Co-authored-by: Masahiko Sawada, Ashutosh Bapat --- contrib/postgres_fdw/connection.c | 470 +++++++++--------- .../postgres_fdw/expected/postgres_fdw.out | 2 +- contrib/postgres_fdw/postgres_fdw.c | 4 + contrib/postgres_fdw/postgres_fdw.h | 3 + 4 files changed, 237 insertions(+), 242 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 266f66cc62..e8aafca42d 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -16,6 +16,7 @@ #include "access/xact.h" #include "catalog/pg_user_mapping.h" #include "commands/defrem.h" +#include "foreign/fdwapi.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "pgstat.h" @@ -80,8 +81,7 @@ static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const char **values, UserMapping *user); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char *sql); -static void begin_remote_xact(ConnCacheEntry *entry); -static void pgfdw_xact_callback(XactEvent event, void *arg); +static void begin_remote_xact(ConnCacheEntry *entry, UserMapping *user); static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, @@ -94,6 +94,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result); static bool UserMappingPasswordRequired(UserMapping *user); +static void pgfdw_cleanup_after_transaction(ConnCacheEntry *entry); +static ConnCacheEntry *GetConnectionCacheEntry(Oid umid); /* * Get a PGconn which can be used to execute queries on the remote PostgreSQL @@ -108,53 +110,11 @@ static bool UserMappingPasswordRequired(UserMapping *user); PGconn * GetConnection(UserMapping *user, bool will_prep_stmt) { - bool found; bool retry = false; ConnCacheEntry *entry; - ConnCacheKey key; MemoryContext ccxt = CurrentMemoryContext; - /* First time through, initialize connection cache hashtable */ - if (ConnectionHash == NULL) - { - HASHCTL ctl; - - ctl.keysize = sizeof(ConnCacheKey); - ctl.entrysize = sizeof(ConnCacheEntry); - ConnectionHash = hash_create("postgres_fdw connections", 8, - &ctl, - HASH_ELEM | HASH_BLOBS); - - /* - * Register some callback functions that manage connection cleanup. - * This should be done just once in each backend. - */ - RegisterXactCallback(pgfdw_xact_callback, NULL); - RegisterSubXactCallback(pgfdw_subxact_callback, NULL); - CacheRegisterSyscacheCallback(FOREIGNSERVEROID, - pgfdw_inval_callback, (Datum) 0); - CacheRegisterSyscacheCallback(USERMAPPINGOID, - pgfdw_inval_callback, (Datum) 0); - } - - /* Set flag that we did GetConnection during the current transaction */ - xact_got_connection = true; - - /* Create hash key for the entry. Assume no pad bytes in key struct */ - key = user->umid; - - /* - * Find or create cached entry for requested connection. - */ - entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); - if (!found) - { - /* - * We need only clear "conn" here; remaining fields will be filled - * later when "conn" is set. - */ - entry->conn = NULL; - } + entry = GetConnectionCacheEntry(user->umid); /* Reject further use of connections which failed abort cleanup. */ pgfdw_reject_incomplete_xact_state_change(entry); @@ -186,7 +146,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt) PG_TRY(); { /* Start a new transaction or subtransaction if needed. */ - begin_remote_xact(entry); + begin_remote_xact(entry, user); } PG_CATCH(); { @@ -247,7 +207,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt) if (entry->conn == NULL) make_new_connection(entry, user); - begin_remote_xact(entry); + begin_remote_xact(entry, user); } /* Remember if caller will prepare statements */ @@ -256,6 +216,56 @@ GetConnection(UserMapping *user, bool will_prep_stmt) return entry->conn; } +static ConnCacheEntry * +GetConnectionCacheEntry(Oid umid) +{ + bool found; + ConnCacheEntry *entry; + ConnCacheKey key; + + /* First time through, initialize connection cache hashtable */ + if (ConnectionHash == NULL) + { + HASHCTL ctl; + + ctl.keysize = sizeof(ConnCacheKey); + ctl.entrysize = sizeof(ConnCacheEntry); + ConnectionHash = hash_create("postgres_fdw connections", 8, + &ctl, + HASH_ELEM | HASH_BLOBS); + + /* + * Register some callback functions that manage connection cleanup. + * This should be done just once in each backend. + */ + RegisterSubXactCallback(pgfdw_subxact_callback, NULL); + CacheRegisterSyscacheCallback(FOREIGNSERVEROID, + pgfdw_inval_callback, (Datum) 0); + CacheRegisterSyscacheCallback(USERMAPPINGOID, + pgfdw_inval_callback, (Datum) 0); + } + + /* Set flag that we did GetConnection during the current transaction */ + xact_got_connection = true; + + /* Create hash key for the entry. Assume no pad bytes in key struct */ + key = umid; + + /* + * Find or create cached entry for requested connection. + */ + entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); + if (!found) + { + /* + * We need only clear "conn" here; remaining fields will be filled + * later when "conn" is set. + */ + entry->conn = NULL; + } + return entry; +} + /* * Reset all transient state fields in the cached connection entry and * establish new connection to the remote server. @@ -545,7 +555,7 @@ do_sql_command(PGconn *conn, const char *sql) * control which remote queries share a snapshot. */ static void -begin_remote_xact(ConnCacheEntry *entry) +begin_remote_xact(ConnCacheEntry *entry, UserMapping *user) { int curlevel = GetCurrentTransactionNestLevel(); @@ -557,6 +567,9 @@ begin_remote_xact(ConnCacheEntry *entry) elog(DEBUG3, "starting remote transaction on connection %p", entry->conn); + /* Register the foreign server to the transaction */ + FdwXactRegisterXact(user->serverid, user->userid); + if (IsolationIsSerializable()) sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; else @@ -772,199 +785,6 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, PG_END_TRY(); } -/* - * pgfdw_xact_callback --- cleanup at main-transaction end. - * - * This runs just late enough that it must not enter user-defined code - * locally. (Entering such code on the remote side is fine. Its remote - * COMMIT TRANSACTION may run deferred triggers.) - */ -static void -pgfdw_xact_callback(XactEvent event, void *arg) -{ - HASH_SEQ_STATUS scan; - ConnCacheEntry *entry; - - /* Quick exit if no connections were touched in this transaction. */ - if (!xact_got_connection) - return; - - /* - * Scan all connection cache entries to find open remote transactions, and - * close them. - */ - hash_seq_init(&scan, ConnectionHash); - while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) - { - PGresult *res; - - /* Ignore cache entry if no open connection right now */ - if (entry->conn == NULL) - continue; - - /* If it has an open remote transaction, try to close it */ - if (entry->xact_depth > 0) - { - bool abort_cleanup_failure = false; - - elog(DEBUG3, "closing remote transaction on connection %p", - entry->conn); - - switch (event) - { - case XACT_EVENT_PARALLEL_PRE_COMMIT: - case XACT_EVENT_PRE_COMMIT: - - /* - * If abort cleanup previously failed for this connection, - * we can't issue any more commands against it. - */ - pgfdw_reject_incomplete_xact_state_change(entry); - - /* Commit all remote transactions during pre-commit */ - entry->changing_xact_state = true; - do_sql_command(entry->conn, "COMMIT TRANSACTION"); - entry->changing_xact_state = false; - - /* - * If there were any errors in subtransactions, and we - * made prepared statements, do a DEALLOCATE ALL to make - * sure we get rid of all prepared statements. This is - * annoying and not terribly bulletproof, but it's - * probably not worth trying harder. - * - * DEALLOCATE ALL only exists in 8.3 and later, so this - * constrains how old a server postgres_fdw can - * communicate with. We intentionally ignore errors in - * the DEALLOCATE, so that we can hobble along to some - * extent with older servers (leaking prepared statements - * as we go; but we don't really support update operations - * pre-8.3 anyway). - */ - if (entry->have_prep_stmt && entry->have_error) - { - res = PQexec(entry->conn, "DEALLOCATE ALL"); - PQclear(res); - } - entry->have_prep_stmt = false; - entry->have_error = false; - break; - case XACT_EVENT_PRE_PREPARE: - - /* - * We disallow any remote transactions, since it's not - * very reasonable to hold them open until the prepared - * transaction is committed. For the moment, throw error - * unconditionally; later we might allow read-only cases. - * Note that the error will cause us to come right back - * here with event == XACT_EVENT_ABORT, so we'll clean up - * the connection state at that point. - */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables"))); - break; - case XACT_EVENT_PARALLEL_COMMIT: - case XACT_EVENT_COMMIT: - case XACT_EVENT_PREPARE: - /* Pre-commit should have closed the open transaction */ - elog(ERROR, "missed cleaning up connection during pre-commit"); - break; - case XACT_EVENT_PARALLEL_ABORT: - case XACT_EVENT_ABORT: - - /* - * Don't try to clean up the connection if we're already - * in error recursion trouble. - */ - if (in_error_recursion_trouble()) - entry->changing_xact_state = true; - - /* - * If connection is already unsalvageable, don't touch it - * further. - */ - if (entry->changing_xact_state) - break; - - /* - * Mark this connection as in the process of changing - * transaction state. - */ - entry->changing_xact_state = true; - - /* Assume we might have lost track of prepared statements */ - entry->have_error = true; - - /* - * If a command has been submitted to the remote server by - * using an asynchronous execution function, the command - * might not have yet completed. Check to see if a - * command is still being processed by the remote server, - * and if so, request cancellation of the command. - */ - if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && - !pgfdw_cancel_query(entry->conn)) - { - /* Unable to cancel running query. */ - abort_cleanup_failure = true; - } - else if (!pgfdw_exec_cleanup_query(entry->conn, - "ABORT TRANSACTION", - false)) - { - /* Unable to abort remote transaction. */ - abort_cleanup_failure = true; - } - else if (entry->have_prep_stmt && entry->have_error && - !pgfdw_exec_cleanup_query(entry->conn, - "DEALLOCATE ALL", - true)) - { - /* Trouble clearing prepared statements. */ - abort_cleanup_failure = true; - } - else - { - entry->have_prep_stmt = false; - entry->have_error = false; - } - - /* Disarm changing_xact_state if it all worked. */ - entry->changing_xact_state = abort_cleanup_failure; - break; - } - } - - /* Reset state to show we're out of a transaction */ - entry->xact_depth = 0; - - /* - * If the connection isn't in a good idle state or it is marked as - * invalid, then discard it to recover. Next GetConnection will open a - * new connection. - */ - if (PQstatus(entry->conn) != CONNECTION_OK || - PQtransactionStatus(entry->conn) != PQTRANS_IDLE || - entry->changing_xact_state || - entry->invalidated) - { - elog(DEBUG3, "discarding connection %p", entry->conn); - disconnect_pg_server(entry); - } - } - - /* - * Regardless of the event type, we can now mark ourselves as out of the - * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE, - * this saves a useless scan of the hashtable during COMMIT or PREPARE.) - */ - xact_got_connection = false; - - /* Also reset cursor numbering for next transaction */ - cursor_number = 0; -} - /* * pgfdw_subxact_callback --- cleanup at subtransaction end. */ @@ -1341,3 +1161,171 @@ exit: ; *result = last_res; return timed_out; } + +void +postgresCommitForeignTransaction(FdwXactRslvState *frstate) +{ + ConnCacheEntry *entry; + PGresult *res; + + Assert((frstate->flags & FDWXACT_FLAG_ONEPHASE) != 0); + + entry = GetConnectionCacheEntry(frstate->usermapping->umid); + + Assert(entry->conn); + + /* + * If abort cleanup previously failed for this connection, we can't issue + * any more commands against it. + */ + pgfdw_reject_incomplete_xact_state_change(entry); + + entry->changing_xact_state = true; + res = pgfdw_exec_query(entry->conn, "COMMIT TRANSACTION"); + entry->changing_xact_state = false; + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ereport(ERROR, (errmsg("could not commit transaction on server %s", + frstate->server->servername))); + + /* + * If there were any errors in subtransactions, and we ma + * made prepared statements, do a DEALLOCATE ALL to make + * sure we get rid of all prepared statements. This is + * annoying and not terribly bulletproof, but it's + * probably not worth trying harder. + * + * DEALLOCATE ALL only exists in 8.3 and later, so this + * constrains how old a server postgres_fdw can + * communicate with. We intentionally ignore errors in + * the DEALLOCATE, so that we can hobble along to some + * extent with older servers (leaking prepared statements + * as we go; but we don't really support update operations + * pre-8.3 anyway). + */ + if (entry->have_prep_stmt && entry->have_error) + { + res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); + } + + /* Cleanup transaction status */ + pgfdw_cleanup_after_transaction(entry); +} + +void +postgresRollbackForeignTransaction(FdwXactRslvState *frstate) +{ + ConnCacheEntry *entry = NULL; + bool abort_cleanup_failure = false; + + Assert((frstate->flags & FDWXACT_FLAG_ONEPHASE) != 0); + + /* + * In simple rollback case, we must have a connection to the foreign server + * because the foreign transaction is not closed yet. We get the connection + * entry from the cache. + */ + entry = GetConnectionCacheEntry(frstate->usermapping->umid); + Assert(entry); + + /* + * Cleanup connection entry transaction if transaction fails before + * establishing a connection. + */ + if (!entry->conn) + { + pgfdw_cleanup_after_transaction(entry); + return; + } + + /* + * Don't try to clean up the connection if we're already + * in error recursion trouble. + */ + if (in_error_recursion_trouble()) + entry->changing_xact_state = true; + + /* + * If connection is before starting transaction or is already unsalvageable, + * do only the cleanup and don't touch it further. + */ + if (entry->changing_xact_state) + { + pgfdw_cleanup_after_transaction(entry); + return; + } + + /* + * Mark this connection as in the process of changing + * transaction state. + */ + entry->changing_xact_state = true; + + /* Assume we might have lost track of prepared statements */ + entry->have_error = true; + + /* + * If a command has been submitted to the remote server by + * using an asynchronous execution function, the command + * might not have yet completed. Check to see if a + * command is still being processed by the remote server, + * and if so, request cancellation of the command. + */ + if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && + !pgfdw_cancel_query(entry->conn)) + { + /* Unable to cancel running query. */ + abort_cleanup_failure = true; + } + else if (!pgfdw_exec_cleanup_query(entry->conn, + "ABORT TRANSACTION", + false)) + { + /* Unable to abort remote transaction. */ + abort_cleanup_failure = true; + } + else if (entry->have_prep_stmt && entry->have_error && + !pgfdw_exec_cleanup_query(entry->conn, + "DEALLOCATE ALL", + true)) + { + /* Trouble clearing prepared statements. */ + abort_cleanup_failure = true; + } + + /* Disarm changing_xact_state if it all worked. */ + entry->changing_xact_state = abort_cleanup_failure; + + /* Cleanup transaction status */ + pgfdw_cleanup_after_transaction(entry); + + return; +} + +/* Cleanup at main-transaction end */ +static void +pgfdw_cleanup_after_transaction(ConnCacheEntry *entry) +{ + /* Reset state to show we're out of a transaction */ + entry->xact_depth = 0; + entry->have_prep_stmt = false; + entry->have_error = false; + + /* + * If the connection isn't in a good idle state, discard it to + * recover. Next GetConnection will open a new connection. + */ + if (PQstatus(entry->conn) != CONNECTION_OK || + PQtransactionStatus(entry->conn) != PQTRANS_IDLE || + entry->changing_xact_state) + { + elog(DEBUG3, "discarding connection %p", entry->conn); + disconnect_pg_server(entry); + } + + entry->changing_xact_state = false; + + /* Also reset cursor numbering for next transaction */ + cursor_number = 0; +} diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index c11092f8cc..3724fdab3d 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -8984,7 +8984,7 @@ SELECT count(*) FROM ft1; -- error here PREPARE TRANSACTION 'fdw_tpc'; -ERROR: cannot PREPARE a transaction that has operated on postgres_fdw foreign tables +ERROR: cannot PREPARE a transaction that has operated on foreign tables ROLLBACK; WARNING: there is no transaction in progress -- =================================================================== diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 2f2d4d171c..ad00a9ce2b 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -560,6 +560,10 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + /* Support functions for foreign transactions */ + routine->CommitForeignTransaction = postgresCommitForeignTransaction; + routine->RollbackForeignTransaction = postgresRollbackForeignTransaction; + PG_RETURN_POINTER(routine); } diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 19ea27a1bc..d714034d6b 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -13,6 +13,7 @@ #ifndef POSTGRES_FDW_H #define POSTGRES_FDW_H +#include "access/fdwxact.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "libpq-fe.h" @@ -137,6 +138,8 @@ extern PGresult *pgfdw_get_result(PGconn *conn, const char *query); extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query); extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql); +extern void postgresCommitForeignTransaction(FdwXactRslvState *frstate); +extern void postgresRollbackForeignTransaction(FdwXactRslvState *frstate); /* in option.c */ extern int ExtractConnectionOptions(List *defelems, -- 2.27.0