From ea2818eacb8065f90f52dfbaa55370f0bd92ede6 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Thu, 26 Mar 2020 21:28:58 +0500 Subject: [PATCH v23 5/7] postgres_fdw supports atomic commit APIs. Co-authored-by: Masahiko Sawada, Ashutosh Bapat --- contrib/postgres_fdw/Makefile | 7 +- contrib/postgres_fdw/connection.c | 588 +++++++++++------- .../postgres_fdw/expected/postgres_fdw.out | 280 ++++++++- contrib/postgres_fdw/fdwxact.conf | 3 + contrib/postgres_fdw/postgres_fdw.c | 21 +- contrib/postgres_fdw/postgres_fdw.h | 8 +- contrib/postgres_fdw/sql/postgres_fdw.sql | 124 +++- doc/src/sgml/postgres-fdw.sgml | 10 +- 8 files changed, 785 insertions(+), 256 deletions(-) create mode 100644 contrib/postgres_fdw/fdwxact.conf diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile index ee8a80a392..91fa6e39fc 100644 --- a/contrib/postgres_fdw/Makefile +++ b/contrib/postgres_fdw/Makefile @@ -16,7 +16,7 @@ SHLIB_LINK_INTERNAL = $(libpq) EXTENSION = postgres_fdw DATA = postgres_fdw--1.0.sql -REGRESS = postgres_fdw +REGRESSCHECK = postgres_fdw ifdef USE_PGXS PG_CONFIG = pg_config @@ -29,3 +29,8 @@ top_builddir = ../.. include $(top_builddir)/src/Makefile.global include $(top_srcdir)/contrib/contrib-global.mk endif + +check: + $(pg_regress_check) \ + --temp-config $(top_srcdir)/contrib/postgres_fdw/fdwxact.conf \ + $(REGRESSCHECK) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 52d1fe3563..25280cbd94 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * connection.c - * Connection management functions for postgres_fdw + * Connection and transaction management functions for postgres_fdw * * Portions Copyright (c) 2012-2020, PostgreSQL Global Development Group * @@ -12,6 +12,7 @@ */ #include "postgres.h" +#include "access/fdwxact.h" #include "access/htup_details.h" #include "access/xact.h" #include "catalog/pg_user_mapping.h" @@ -56,6 +57,7 @@ typedef struct ConnCacheEntry bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state change in process */ bool invalidated; /* true if reconnect is pending */ + bool xact_got_connection; uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ } ConnCacheEntry; @@ -69,17 +71,13 @@ static HTAB *ConnectionHash = NULL; static unsigned int cursor_number = 0; static unsigned int prep_stmt_number = 0; -/* tracks whether any work is needed in callback functions */ -static bool xact_got_connection = false; - /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const char **values, UserMapping *user); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char *sql); -static void begin_remote_xact(ConnCacheEntry *entry); -static void pgfdw_xact_callback(XactEvent event, void *arg); +static void begin_remote_xact(ConnCacheEntry *entry, Oid serverid, Oid userid); static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, @@ -92,6 +90,12 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result); static bool UserMappingPasswordRequired(UserMapping *user); +static void pgfdw_end_prepared_xact(ConnCacheEntry *entry, char *fdwxact_id, + bool is_commit); +static void pgfdw_cleanup_after_transaction(ConnCacheEntry *entry); +static ConnCacheEntry *GetConnectionState(ForeignServer *server, UserMapping *userg, + bool will_prep_stmt, bool start_transaction); +static ConnCacheEntry *GetConnectionCacheEntry(Oid umid); /* * Get a PGconn which can be used to execute queries on the remote PostgreSQL @@ -104,11 +108,29 @@ static bool UserMappingPasswordRequired(UserMapping *user); * (not even on error), we need this flag to cue manual cleanup. */ PGconn * -GetConnection(UserMapping *user, bool will_prep_stmt) +GetConnection(UserMapping *user, bool will_prep_stmt, bool start_transaction) { - bool found; ConnCacheEntry *entry; - ConnCacheKey key; + + entry = GetConnectionState(GetForeignServer(user->serverid), + user, will_prep_stmt, start_transaction); + + return entry->conn; +} + +/* + * Get connection cache entry. Unlike GetConenctionState function, this function + * doesn't establish new connection even if not yet. + */ +static ConnCacheEntry * +GetConnectionCacheEntry(Oid umid) +{ + ConnCacheEntry *entry; + ConnCacheKey key; + bool found; + + /* Create hash key for the entry. Assume no pad bytes in key struct */ + key = umid; /* First time through, initialize connection cache hashtable */ if (ConnectionHash == NULL) @@ -128,7 +150,6 @@ GetConnection(UserMapping *user, bool will_prep_stmt) * Register some callback functions that manage connection cleanup. * This should be done just once in each backend. */ - RegisterXactCallback(pgfdw_xact_callback, NULL); RegisterSubXactCallback(pgfdw_subxact_callback, NULL); CacheRegisterSyscacheCallback(FOREIGNSERVEROID, pgfdw_inval_callback, (Datum) 0); @@ -136,12 +157,6 @@ GetConnection(UserMapping *user, bool will_prep_stmt) pgfdw_inval_callback, (Datum) 0); } - /* Set flag that we did GetConnection during the current transaction */ - xact_got_connection = true; - - /* Create hash key for the entry. Assume no pad bytes in key struct */ - key = user->umid; - /* * Find or create cached entry for requested connection. */ @@ -155,6 +170,22 @@ GetConnection(UserMapping *user, bool will_prep_stmt) entry->conn = NULL; } + return entry; +} + +/* + * This function gets the connection cache entry and establishes connection + * to the foreign server if there is no connection and starts a new transaction + * if 'start_transaction' is true. + */ +static ConnCacheEntry * +GetConnectionState(ForeignServer *server, UserMapping *user, + bool will_prep_stmt, bool start_transaction) +{ + ConnCacheEntry *entry; + + entry = GetConnectionCacheEntry(user->umid); + /* Reject further use of connections which failed abort cleanup. */ pgfdw_reject_incomplete_xact_state_change(entry); @@ -182,14 +213,13 @@ GetConnection(UserMapping *user, bool will_prep_stmt) */ if (entry->conn == NULL) { - ForeignServer *server = GetForeignServer(user->serverid); - /* Reset all transient state fields, to be sure all are clean */ entry->xact_depth = 0; entry->have_prep_stmt = false; entry->have_error = false; entry->changing_xact_state = false; entry->invalidated = false; + entry->xact_got_connection = false; entry->server_hashvalue = GetSysCacheHashValue1(FOREIGNSERVEROID, ObjectIdGetDatum(server->serverid)); @@ -200,6 +230,15 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* Now try to make the connection */ entry->conn = connect_pg_server(server, user); + Assert(entry->conn); + + if (!entry->conn) + { + elog(DEBUG3, "attempt to connection to server \"%s\" by postgres_fdw failed", + server->servername); + return NULL; + } + elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", entry->conn, server->servername, user->umid, user->userid); } @@ -207,12 +246,18 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* * Start a new transaction or subtransaction if needed. */ - begin_remote_xact(entry); + if (start_transaction) + { + begin_remote_xact(entry, user->serverid, user->userid); + + /* Set flag that we did GetConnection during the current transaction */ + entry->xact_got_connection = true; + } /* Remember if caller will prepare statements */ entry->have_prep_stmt |= will_prep_stmt; - return entry->conn; + return entry; } /* @@ -473,7 +518,7 @@ do_sql_command(PGconn *conn, const char *sql) * control which remote queries share a snapshot. */ static void -begin_remote_xact(ConnCacheEntry *entry) +begin_remote_xact(ConnCacheEntry *entry, Oid serverid, Oid userid) { int curlevel = GetCurrentTransactionNestLevel(); @@ -700,193 +745,6 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, PG_END_TRY(); } -/* - * pgfdw_xact_callback --- cleanup at main-transaction end. - */ -static void -pgfdw_xact_callback(XactEvent event, void *arg) -{ - HASH_SEQ_STATUS scan; - ConnCacheEntry *entry; - - /* Quick exit if no connections were touched in this transaction. */ - if (!xact_got_connection) - return; - - /* - * Scan all connection cache entries to find open remote transactions, and - * close them. - */ - hash_seq_init(&scan, ConnectionHash); - while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) - { - PGresult *res; - - /* Ignore cache entry if no open connection right now */ - if (entry->conn == NULL) - continue; - - /* If it has an open remote transaction, try to close it */ - if (entry->xact_depth > 0) - { - bool abort_cleanup_failure = false; - - elog(DEBUG3, "closing remote transaction on connection %p", - entry->conn); - - switch (event) - { - case XACT_EVENT_PARALLEL_PRE_COMMIT: - case XACT_EVENT_PRE_COMMIT: - - /* - * If abort cleanup previously failed for this connection, - * we can't issue any more commands against it. - */ - pgfdw_reject_incomplete_xact_state_change(entry); - - /* Commit all remote transactions during pre-commit */ - entry->changing_xact_state = true; - do_sql_command(entry->conn, "COMMIT TRANSACTION"); - entry->changing_xact_state = false; - - /* - * If there were any errors in subtransactions, and we - * made prepared statements, do a DEALLOCATE ALL to make - * sure we get rid of all prepared statements. This is - * annoying and not terribly bulletproof, but it's - * probably not worth trying harder. - * - * DEALLOCATE ALL only exists in 8.3 and later, so this - * constrains how old a server postgres_fdw can - * communicate with. We intentionally ignore errors in - * the DEALLOCATE, so that we can hobble along to some - * extent with older servers (leaking prepared statements - * as we go; but we don't really support update operations - * pre-8.3 anyway). - */ - if (entry->have_prep_stmt && entry->have_error) - { - res = PQexec(entry->conn, "DEALLOCATE ALL"); - PQclear(res); - } - entry->have_prep_stmt = false; - entry->have_error = false; - break; - case XACT_EVENT_PRE_PREPARE: - - /* - * We disallow any remote transactions, since it's not - * very reasonable to hold them open until the prepared - * transaction is committed. For the moment, throw error - * unconditionally; later we might allow read-only cases. - * Note that the error will cause us to come right back - * here with event == XACT_EVENT_ABORT, so we'll clean up - * the connection state at that point. - */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables"))); - break; - case XACT_EVENT_PARALLEL_COMMIT: - case XACT_EVENT_COMMIT: - case XACT_EVENT_PREPARE: - /* Pre-commit should have closed the open transaction */ - elog(ERROR, "missed cleaning up connection during pre-commit"); - break; - case XACT_EVENT_PARALLEL_ABORT: - case XACT_EVENT_ABORT: - - /* - * Don't try to clean up the connection if we're already - * in error recursion trouble. - */ - if (in_error_recursion_trouble()) - entry->changing_xact_state = true; - - /* - * If connection is already unsalvageable, don't touch it - * further. - */ - if (entry->changing_xact_state) - break; - - /* - * Mark this connection as in the process of changing - * transaction state. - */ - entry->changing_xact_state = true; - - /* Assume we might have lost track of prepared statements */ - entry->have_error = true; - - /* - * If a command has been submitted to the remote server by - * using an asynchronous execution function, the command - * might not have yet completed. Check to see if a - * command is still being processed by the remote server, - * and if so, request cancellation of the command. - */ - if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && - !pgfdw_cancel_query(entry->conn)) - { - /* Unable to cancel running query. */ - abort_cleanup_failure = true; - } - else if (!pgfdw_exec_cleanup_query(entry->conn, - "ABORT TRANSACTION", - false)) - { - /* Unable to abort remote transaction. */ - abort_cleanup_failure = true; - } - else if (entry->have_prep_stmt && entry->have_error && - !pgfdw_exec_cleanup_query(entry->conn, - "DEALLOCATE ALL", - true)) - { - /* Trouble clearing prepared statements. */ - abort_cleanup_failure = true; - } - else - { - entry->have_prep_stmt = false; - entry->have_error = false; - } - - /* Disarm changing_xact_state if it all worked. */ - entry->changing_xact_state = abort_cleanup_failure; - break; - } - } - - /* Reset state to show we're out of a transaction */ - entry->xact_depth = 0; - - /* - * If the connection isn't in a good idle state, discard it to - * recover. Next GetConnection will open a new connection. - */ - if (PQstatus(entry->conn) != CONNECTION_OK || - PQtransactionStatus(entry->conn) != PQTRANS_IDLE || - entry->changing_xact_state) - { - elog(DEBUG3, "discarding connection %p", entry->conn); - disconnect_pg_server(entry); - } - } - - /* - * Regardless of the event type, we can now mark ourselves as out of the - * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE, - * this saves a useless scan of the hashtable during COMMIT or PREPARE.) - */ - xact_got_connection = false; - - /* Also reset cursor numbering for next transaction */ - cursor_number = 0; -} - /* * pgfdw_subxact_callback --- cleanup at subtransaction end. */ @@ -903,10 +761,6 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, event == SUBXACT_EVENT_ABORT_SUB)) return; - /* Quick exit if no connections were touched in this transaction. */ - if (!xact_got_connection) - return; - /* * Scan all connection cache entries to find open remote subtransactions * of the current level, and close them. @@ -917,6 +771,10 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, { char sql[100]; + /* Quick exit if no connections were touched in this transaction. */ + if (!entry->xact_got_connection) + continue; + /* * We only care about connections with open remote subtransactions of * the current level. @@ -1251,3 +1109,309 @@ exit: ; *result = last_res; return timed_out; } + +/* + * Prepare a transaction on foreign server. + */ +void +postgresPrepareForeignTransaction(FdwXactRslvState *frstate) +{ + ConnCacheEntry *entry = NULL; + PGresult *res; + StringInfo command; + + /* The transaction should have started already get the cache entry */ + entry = GetConnectionCacheEntry(frstate->usermapping->umid); + + /* The transaction should have been started */ + Assert(entry->xact_got_connection && entry->conn); + + pgfdw_reject_incomplete_xact_state_change(entry); + + command = makeStringInfo(); + appendStringInfo(command, "PREPARE TRANSACTION '%s'", frstate->fdwxact_id); + + /* Do prepare foreign transaction */ + entry->changing_xact_state = true; + res = pgfdw_exec_query(entry->conn, command->data); + entry->changing_xact_state = false; + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ereport(ERROR, (errmsg("could not prepare transaction on server %s with ID %s", + frstate->server->servername, frstate->fdwxact_id))); + + elog(DEBUG1, "prepared foreign transaction on server %s with ID %s", + frstate->server->servername, frstate->fdwxact_id); + + if (entry->have_prep_stmt && entry->have_error) + { + res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); + } + + pgfdw_cleanup_after_transaction(entry); +} + +/* + * Commit a transaction or a prepared transaction on foreign server. If + * state->flags contains FDWXACT_FLAG_ONEPHASE this function can commit the + * foreign transaction without preparation, otherwise commit the prepared + * transaction. + */ +void +postgresCommitForeignTransaction(FdwXactRslvState *frstate) +{ + ConnCacheEntry *entry = NULL; + bool is_onephase = (frstate->flags & FDWXACT_FLAG_ONEPHASE) != 0; + PGresult *res; + + if (!is_onephase) + { + /* + * In two-phase commit case, the foreign transaction has prepared and + * closed, so we might not have a connection to it. We get a connection + * but don't start transaction. + */ + entry = GetConnectionState(frstate->server, frstate->usermapping, false, false); + + /* COMMIT PREPARED the transaction */ + pgfdw_end_prepared_xact(entry, frstate->fdwxact_id, true); + return; + } + + /* + * In simple commit case, we must have a connection to the foreign server + * because the foreign transaction is not closed yet. We get the connection + * entry from the cache. + */ + entry = GetConnectionCacheEntry(frstate->usermapping->umid); + Assert(entry); + + if (!entry->conn || !entry->xact_got_connection) + return; + + /* + * If abort cleanup previously failed for this connection, we can't issue + * any more commands against it. + */ + pgfdw_reject_incomplete_xact_state_change(entry); + + entry->changing_xact_state = true; + res = pgfdw_exec_query(entry->conn, "COMMIT TRANSACTION"); + entry->changing_xact_state = false; + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ereport(ERROR, (errmsg("could not commit transaction on server %s", + frstate->server->servername))); + + /* + * If there were any errors in subtransactions, and we ma + * made prepared statements, do a DEALLOCATE ALL to make + * sure we get rid of all prepared statements. This is + * annoying and not terribly bulletproof, but it's + * probably not worth trying harder. + * + * DEALLOCATE ALL only exists in 8.3 and later, so this + * constrains how old a server postgres_fdw can + * communicate with. We intentionally ignore errors in + * the DEALLOCATE, so that we can hobble along to some + * extent with older servers (leaking prepared statements + * as we go; but we don't really support update operations + * pre-8.3 anyway). + */ + if (entry->have_prep_stmt && entry->have_error) + { + res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); + } + + /* Cleanup transaction status */ + pgfdw_cleanup_after_transaction(entry); +} + +/* + * Rollback a transaction on foreign server. As with commit case, if state->flags + * contains FDWAXCT_FLAG_ONEPHASE this function can rollback the foreign + * transaction without preparation, other wise rollback the prepared transaction. + * This function must tolerate to being called recursively as an error can happen + * during aborting. + */ +void +postgresRollbackForeignTransaction(FdwXactRslvState *frstate) +{ + bool is_onephase = (frstate->flags & FDWXACT_FLAG_ONEPHASE) != 0; + ConnCacheEntry *entry = NULL; + bool abort_cleanup_failure = false; + + if (!is_onephase) + { + /* + * In two-phase commit case, the foreign transaction has prepared and + * closed, so we might not have a connection to it. We get a connection + * but don't start transaction. + */ + entry = GetConnectionState(frstate->server, frstate->usermapping, false, false); + + /* ROLLBACK PREPARED the transaction */ + pgfdw_end_prepared_xact(entry, frstate->fdwxact_id, false); + return; + } + + /* + * In simple rollback case, we must have a connection to the foreign server + * because the foreign transaction is not closed yet. We get the connection + * entry from the cache. + */ + entry = GetConnectionCacheEntry(frstate->usermapping->umid); + Assert(entry); + + /* + * Cleanup connection entry transaction if transaction fails before + * establishing a connection or starting transaction. + */ + if (!entry->conn || !entry->xact_got_connection) + { + pgfdw_cleanup_after_transaction(entry); + return; + } + + /* + * Don't try to clean up the connection if we're already + * in error recursion trouble. + */ + if (in_error_recursion_trouble()) + entry->changing_xact_state = true; + + /* + * If connection is before starting transaction or is already unsalvageable, + * do only the cleanup and don't touch it further. + */ + if (entry->changing_xact_state) + { + pgfdw_cleanup_after_transaction(entry); + return; + } + + /* + * Mark this connection as in the process of changing + * transaction state. + */ + entry->changing_xact_state = true; + + /* Assume we might have lost track of prepared statements */ + entry->have_error = true; + + /* + * If a command has been submitted to the remote server by + * using an asynchronous execution function, the command + * might not have yet completed. Check to see if a + * command is still being processed by the remote server, + * and if so, request cancellation of the command. + */ + if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && + !pgfdw_cancel_query(entry->conn)) + { + /* Unable to cancel running query. */ + abort_cleanup_failure = true; + } + else if (!pgfdw_exec_cleanup_query(entry->conn, + "ABORT TRANSACTION", + false)) + { + /* Unable to abort remote transaction. */ + abort_cleanup_failure = true; + } + else if (entry->have_prep_stmt && entry->have_error && + !pgfdw_exec_cleanup_query(entry->conn, + "DEALLOCATE ALL", + true)) + { + /* Trouble clearing prepared statements. */ + abort_cleanup_failure = true; + } + + /* Disarm changing_xact_state if it all worked. */ + entry->changing_xact_state = abort_cleanup_failure; + + /* Cleanup transaction status */ + pgfdw_cleanup_after_transaction(entry); + + return; +} + +/* + * Commit or rollback prepared transaction on the foreign server. + */ +static void +pgfdw_end_prepared_xact(ConnCacheEntry *entry, char *fdwxact_id, bool is_commit) +{ + StringInfo command; + PGresult *res; + + command = makeStringInfo(); + appendStringInfo(command, "%s PREPARED '%s'", + is_commit ? "COMMIT" : "ROLLBACK", + fdwxact_id); + + res = pgfdw_exec_query(entry->conn, command->data); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + int sqlstate; + char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + + if (diag_sqlstate) + { + sqlstate = MAKE_SQLSTATE(diag_sqlstate[0], + diag_sqlstate[1], + diag_sqlstate[2], + diag_sqlstate[3], + diag_sqlstate[4]); + } + else + sqlstate = ERRCODE_CONNECTION_FAILURE; + + /* + * As core global transaction manager states, it's possible that the + * given foreign transaction doesn't exist on the foreign server. So + * we should accept an UNDEFINED_OBJECT error. + */ + if (sqlstate != ERRCODE_UNDEFINED_OBJECT) + pgfdw_report_error(ERROR, res, entry->conn, false, command->data); + } + + elog(DEBUG1, "%s prepared foreign transaction with ID %s", + is_commit ? "commit" : "rollback", + fdwxact_id); + + /* Cleanup transaction status */ + pgfdw_cleanup_after_transaction(entry); +} + +/* Cleanup at main-transaction end */ +static void +pgfdw_cleanup_after_transaction(ConnCacheEntry *entry) +{ + /* Reset state to show we're out of a transaction */ + entry->xact_depth = 0; + entry->have_prep_stmt = false; + entry->have_error = false; + entry->xact_got_connection = false; + + /* + * If the connection isn't in a good idle state, discard it to + * recover. Next GetConnection will open a new connection. + */ + if (PQstatus(entry->conn) != CONNECTION_OK || + PQtransactionStatus(entry->conn) != PQTRANS_IDLE || + entry->changing_xact_state) + { + elog(DEBUG3, "discarding connection %p", entry->conn); + disconnect_pg_server(entry); + } + + entry->changing_xact_state = false; + + /* Also reset cursor numbering for next transaction */ + cursor_number = 0; +} diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 82fc1290ef..dbdd4cc32c 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -13,12 +13,17 @@ DO $d$ OPTIONS (dbname '$$||current_database()||$$', port '$$||current_setting('port')||$$' )$$; + EXECUTE $$CREATE SERVER loopback3 FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname '$$||current_database()||$$', + port '$$||current_setting('port')||$$' + )$$; END; $d$; CREATE USER MAPPING FOR public SERVER testserver1 OPTIONS (user 'value', password 'value'); CREATE USER MAPPING FOR CURRENT_USER SERVER loopback; CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2; +CREATE USER MAPPING FOR CURRENT_USER SERVER loopback3; -- =================================================================== -- create objects used through FDW loopback server -- =================================================================== @@ -52,6 +57,13 @@ CREATE TABLE "S 1"."T 4" ( c3 text, CONSTRAINT t4_pkey PRIMARY KEY (c1) ); +CREATE TABLE "S 1"."T 5" ( + c1 int NOT NULL +); +CREATE TABLE "S 1"."T 6" ( + c1 int NOT NULL, + CONSTRAINT t6_pkey PRIMARY KEY (c1) +); -- Disable autovacuum for these tables to avoid unexpected effects of that ALTER TABLE "S 1"."T 1" SET (autovacuum_enabled = 'false'); ALTER TABLE "S 1"."T 2" SET (autovacuum_enabled = 'false'); @@ -87,6 +99,7 @@ ANALYZE "S 1"."T 1"; ANALYZE "S 1"."T 2"; ANALYZE "S 1"."T 3"; ANALYZE "S 1"."T 4"; +ANALYZE "S 1"."T 5"; -- =================================================================== -- create foreign tables -- =================================================================== @@ -129,6 +142,12 @@ CREATE FOREIGN TABLE ft6 ( c2 int NOT NULL, c3 text ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4'); +CREATE FOREIGN TABLE ft7_2pc ( + c1 int NOT NULL +) SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5'); +CREATE FOREIGN TABLE ft8_2pc ( + c1 int NOT NULL +) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 5'); -- =================================================================== -- tests for validator -- =================================================================== @@ -191,15 +210,17 @@ ALTER FOREIGN TABLE ft2 OPTIONS (schema_name 'S 1', table_name 'T 1'); ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); \det+ - List of foreign tables - Schema | Table | Server | FDW options | Description ---------+-------+-----------+---------------------------------------+------------- - public | ft1 | loopback | (schema_name 'S 1', table_name 'T 1') | - public | ft2 | loopback | (schema_name 'S 1', table_name 'T 1') | - public | ft4 | loopback | (schema_name 'S 1', table_name 'T 3') | - public | ft5 | loopback | (schema_name 'S 1', table_name 'T 4') | - public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') | -(5 rows) + List of foreign tables + Schema | Table | Server | FDW options | Description +--------+---------+-----------+---------------------------------------+------------- + public | ft1 | loopback | (schema_name 'S 1', table_name 'T 1') | + public | ft2 | loopback | (schema_name 'S 1', table_name 'T 1') | + public | ft4 | loopback | (schema_name 'S 1', table_name 'T 3') | + public | ft5 | loopback | (schema_name 'S 1', table_name 'T 4') | + public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') | + public | ft7_2pc | loopback | (schema_name 'S 1', table_name 'T 5') | + public | ft8_2pc | loopback2 | (schema_name 'S 1', table_name 'T 5') | +(7 rows) -- Test that alteration of server options causes reconnection -- Remote's errors might be non-English, so hide them to ensure stable results @@ -8923,10 +8944,10 @@ RESET ROLE; ALTER USER MAPPING FOR regress_nosuper SERVER loopback_nopw OPTIONS (ADD password_required 'false'); SET ROLE regress_nosuper; -- Should finally work now -SELECT * FROM ft1_nopw LIMIT 1; - c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 -------+----+----+----+----+----+------------+---- - 1111 | 2 | | | | | ft1 | +SELECT * FROM ft1_nopw ORDER BY 1 LIMIT 1; + c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 +----+----+-------------------+------------------------------+--------------------------+----+------------+----- + 1 | 2 | 00001_trig_update | Fri Jan 02 00:00:00 1970 PST | Fri Jan 02 00:00:00 1970 | 1 | 1 | foo (1 row) -- unpriv user also cannot set sslcert / sslkey on the user mapping @@ -8943,16 +8964,16 @@ HINT: User mappings with the sslcert or sslkey options set may only be created DROP USER MAPPING FOR CURRENT_USER SERVER loopback_nopw; -- This will fail again as it'll resolve the user mapping for public, which -- lacks password_required=false -SELECT * FROM ft1_nopw LIMIT 1; +SELECT * FROM ft1_nopw ORDER BY 1 LIMIT 1; ERROR: password is required DETAIL: Non-superusers must provide a password in the user mapping. RESET ROLE; -- The user mapping for public is passwordless and lacks the password_required=false -- mapping option, but will work because the current user is a superuser. SELECT * FROM ft1_nopw LIMIT 1; - c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 -------+----+----+----+----+----+------------+---- - 1111 | 2 | | | | | ft1 | + c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 +----+----+-------+------------------------------+--------------------------+----+------------+----- + 6 | 6 | 00006 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo (1 row) -- cleanup @@ -8961,16 +8982,225 @@ DROP OWNED BY regress_nosuper; DROP ROLE regress_nosuper; -- Clean-up RESET enable_partitionwise_aggregate; --- Two-phase transactions are not supported. +-- =================================================================== +-- test distributed atomic commit across foreign servers +-- =================================================================== +-- Enable atomic commit +SET foreign_twophase_commit TO 'required'; +-- Modify single foreign server and then commit and rollback. +BEGIN; +INSERT INTO ft7_2pc VALUES(1); +COMMIT; +SELECT * FROM ft7_2pc; + c1 +---- + 1 +(1 row) + +BEGIN; +INSERT INTO ft7_2pc VALUES(1); +ROLLBACK; +SELECT * FROM ft7_2pc; + c1 +---- + 1 +(1 row) + +-- Modify two servers then commit and rollback. This requires to use 2PC. +BEGIN; +INSERT INTO ft7_2pc VALUES(2); +INSERT INTO ft8_2pc VALUES(2); +COMMIT; +SELECT * FROM ft8_2pc; + c1 +---- + 1 + 2 + 2 +(3 rows) + +BEGIN; +INSERT INTO ft7_2pc VALUES(2); +INSERT INTO ft8_2pc VALUES(2); +ROLLBACK; +SELECT * FROM ft8_2pc; + c1 +---- + 1 + 2 + 2 +(3 rows) + +-- Modify both local data and 2PC-capable server then commit and rollback. +-- This also requires to use 2PC. BEGIN; -SELECT count(*) FROM ft1; +INSERT INTO ft7_2pc VALUES(3); +INSERT INTO "S 1"."T 6" VALUES (3); +COMMIT; +SELECT * FROM ft7_2pc; + c1 +---- + 1 + 2 + 2 + 3 +(4 rows) + +SELECT * FROM "S 1"."T 6"; + c1 +---- + 3 +(1 row) + +BEGIN; +INSERT INTO ft7_2pc VALUES(3); +INSERT INTO "S 1"."T 6" VALUES (3); +ERROR: duplicate key value violates unique constraint "t6_pkey" +DETAIL: Key (c1)=(3) already exists. +ROLLBACK; +SELECT * FROM ft7_2pc; + c1 +---- + 1 + 2 + 2 + 3 +(4 rows) + +SELECT * FROM "S 1"."T 6"; + c1 +---- + 3 +(1 row) + +-- Modify foreign server and raise an error. No data changed. +BEGIN; +INSERT INTO ft7_2pc VALUES(4); +INSERT INTO ft8_2pc VALUES(NULL); -- violation +ERROR: null value in column "c1" of relation "T 5" violates not-null constraint +DETAIL: Failing row contains (null). +CONTEXT: remote SQL command: INSERT INTO "S 1"."T 5"(c1) VALUES ($1) +ROLLBACK; +SELECT * FROM ft8_2pc; + c1 +---- + 1 + 2 + 2 + 3 +(4 rows) + +BEGIN; +INSERT INTO ft7_2pc VALUES (5); +INSERT INTO ft8_2pc VALUES (5); +SAVEPOINT S1; +INSERT INTO ft7_2pc VALUES (6); +INSERT INTO ft8_2pc VALUES (6); +ROLLBACK TO S1; +COMMIT; +SELECT * FROM ft7_2pc; + c1 +---- + 1 + 2 + 2 + 3 + 5 + 5 +(6 rows) + +SELECT * FROM ft8_2pc; + c1 +---- + 1 + 2 + 2 + 3 + 5 + 5 +(6 rows) + +RELEASE SAVEPOINT S1; +ERROR: RELEASE SAVEPOINT can only be used in transaction blocks +-- When set to 'disabled', we can commit it +SET foreign_twophase_commit TO 'disabled'; +BEGIN; +INSERT INTO ft7_2pc VALUES(8); +INSERT INTO ft8_2pc VALUES(8); +COMMIT; -- success +SELECT * FROM ft7_2pc; + c1 +---- + 1 + 2 + 2 + 3 + 5 + 5 + 8 + 8 +(8 rows) + +SELECT * FROM ft8_2pc; + c1 +---- + 1 + 2 + 2 + 3 + 5 + 5 + 8 + 8 +(8 rows) + +SET foreign_twophase_commit TO 'required'; +-- Commit and rollback foreign transactions that are part of +-- prepare transaction. +BEGIN; +INSERT INTO ft7_2pc VALUES(9); +INSERT INTO ft8_2pc VALUES(9); +PREPARE TRANSACTION 'gx1'; +COMMIT PREPARED 'gx1'; +SELECT * FROM ft8_2pc; + c1 +---- + 1 + 2 + 2 + 3 + 5 + 5 + 8 + 8 + 9 + 9 +(10 rows) + +BEGIN; +INSERT INTO ft7_2pc VALUES(9); +INSERT INTO ft8_2pc VALUES(9); +PREPARE TRANSACTION 'gx1'; +ROLLBACK PREPARED 'gx1'; +SELECT * FROM ft8_2pc; + c1 +---- + 1 + 2 + 2 + 3 + 5 + 5 + 8 + 8 + 9 + 9 +(10 rows) + +-- No entry remained +SELECT count(*) FROM pg_foreign_xacts; count ------- - 822 + 0 (1 row) --- error here -PREPARE TRANSACTION 'fdw_tpc'; -ERROR: cannot PREPARE a transaction that has operated on postgres_fdw foreign tables -ROLLBACK; -WARNING: there is no transaction in progress diff --git a/contrib/postgres_fdw/fdwxact.conf b/contrib/postgres_fdw/fdwxact.conf new file mode 100644 index 0000000000..3fdbf93cdb --- /dev/null +++ b/contrib/postgres_fdw/fdwxact.conf @@ -0,0 +1,3 @@ +max_prepared_transactions = 3 +max_prepared_foreign_transactions = 3 +max_foreign_transaction_resolvers = 2 diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 9fc53cad68..bf21fbd8ba 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -14,6 +14,7 @@ #include +#include "access/fdwxact.h" #include "access/htup_details.h" #include "access/sysattr.h" #include "access/table.h" @@ -504,7 +505,6 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); - /* * Foreign-data wrapper handler function: return a struct with pointers * to my callback routines. @@ -558,6 +558,11 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + /* Support functions for foreign transactions */ + routine->PrepareForeignTransaction = postgresPrepareForeignTransaction; + routine->CommitForeignTransaction = postgresCommitForeignTransaction; + routine->RollbackForeignTransaction = postgresRollbackForeignTransaction; + PG_RETURN_POINTER(routine); } @@ -1434,7 +1439,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->conn = GetConnection(user, false, true); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); @@ -2372,7 +2377,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->conn = GetConnection(user, false, true); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -2746,7 +2751,7 @@ estimate_path_cost_size(PlannerInfo *root, false, &retrieved_attrs, NULL); /* Get the remote estimate */ - conn = GetConnection(fpinfo->user, false); + conn = GetConnection(fpinfo->user, false, true); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3566,7 +3571,7 @@ create_foreign_modify(EState *estate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->conn = GetConnection(user, true, true); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ @@ -4441,7 +4446,7 @@ postgresAnalyzeForeignTable(Relation relation, */ table = GetForeignTable(RelationGetRelid(relation)); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, true); /* * Construct command to get page count for relation. @@ -4527,7 +4532,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, true); /* * Construct cursor that retrieves whole rows from remote. @@ -4755,7 +4760,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) */ server = GetForeignServer(serverOid); mapping = GetUserMapping(GetUserId(), server->serverid); - conn = GetConnection(mapping, false); + conn = GetConnection(mapping, false, true); /* Don't attempt to import collation if remote server hasn't got it */ if (PQserverVersion(conn) < 90100) diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index eef410db39..5445569301 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -13,6 +13,7 @@ #ifndef POSTGRES_FDW_H #define POSTGRES_FDW_H +#include "access/fdwxact.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "libpq-fe.h" @@ -129,7 +130,8 @@ extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); /* in connection.c */ -extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); +extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt, + bool start_transaction); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); @@ -137,6 +139,9 @@ extern PGresult *pgfdw_get_result(PGconn *conn, const char *query); extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query); extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql); +extern void postgresPrepareForeignTransaction(FdwXactRslvState *frstate); +extern void postgresCommitForeignTransaction(FdwXactRslvState *frstate); +extern void postgresRollbackForeignTransaction(FdwXactRslvState *frstate); /* in option.c */ extern int ExtractConnectionOptions(List *defelems, @@ -203,6 +208,7 @@ extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, bool is_subquery, List **retrieved_attrs, List **params_list); extern const char *get_jointype_name(JoinType jointype); +extern bool server_uses_twophase_commit(ForeignServer *server); /* in shippable.c */ extern bool is_builtin(Oid objectId); diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 83971665e3..1ef66123df 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -15,6 +15,10 @@ DO $d$ OPTIONS (dbname '$$||current_database()||$$', port '$$||current_setting('port')||$$' )$$; + EXECUTE $$CREATE SERVER loopback3 FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname '$$||current_database()||$$', + port '$$||current_setting('port')||$$' + )$$; END; $d$; @@ -22,6 +26,7 @@ CREATE USER MAPPING FOR public SERVER testserver1 OPTIONS (user 'value', password 'value'); CREATE USER MAPPING FOR CURRENT_USER SERVER loopback; CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2; +CREATE USER MAPPING FOR CURRENT_USER SERVER loopback3; -- =================================================================== -- create objects used through FDW loopback server @@ -56,6 +61,14 @@ CREATE TABLE "S 1"."T 4" ( c3 text, CONSTRAINT t4_pkey PRIMARY KEY (c1) ); +CREATE TABLE "S 1"."T 5" ( + c1 int NOT NULL +); + +CREATE TABLE "S 1"."T 6" ( + c1 int NOT NULL, + CONSTRAINT t6_pkey PRIMARY KEY (c1) +); -- Disable autovacuum for these tables to avoid unexpected effects of that ALTER TABLE "S 1"."T 1" SET (autovacuum_enabled = 'false'); @@ -94,6 +107,7 @@ ANALYZE "S 1"."T 1"; ANALYZE "S 1"."T 2"; ANALYZE "S 1"."T 3"; ANALYZE "S 1"."T 4"; +ANALYZE "S 1"."T 5"; -- =================================================================== -- create foreign tables @@ -142,6 +156,15 @@ CREATE FOREIGN TABLE ft6 ( c3 text ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4'); +CREATE FOREIGN TABLE ft7_2pc ( + c1 int NOT NULL +) SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5'); + +CREATE FOREIGN TABLE ft8_2pc ( + c1 int NOT NULL +) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 5'); + + -- =================================================================== -- tests for validator -- =================================================================== @@ -2598,7 +2621,7 @@ ALTER USER MAPPING FOR regress_nosuper SERVER loopback_nopw OPTIONS (ADD passwor SET ROLE regress_nosuper; -- Should finally work now -SELECT * FROM ft1_nopw LIMIT 1; +SELECT * FROM ft1_nopw ORDER BY 1 LIMIT 1; -- unpriv user also cannot set sslcert / sslkey on the user mapping -- first set password_required so we see the right error messages @@ -2612,7 +2635,7 @@ DROP USER MAPPING FOR CURRENT_USER SERVER loopback_nopw; -- This will fail again as it'll resolve the user mapping for public, which -- lacks password_required=false -SELECT * FROM ft1_nopw LIMIT 1; +SELECT * FROM ft1_nopw ORDER BY 1 LIMIT 1; RESET ROLE; @@ -2628,9 +2651,98 @@ DROP ROLE regress_nosuper; -- Clean-up RESET enable_partitionwise_aggregate; --- Two-phase transactions are not supported. +-- =================================================================== +-- test distributed atomic commit across foreign servers +-- =================================================================== + +-- Enable atomic commit +SET foreign_twophase_commit TO 'required'; + +-- Modify single foreign server and then commit and rollback. BEGIN; -SELECT count(*) FROM ft1; --- error here -PREPARE TRANSACTION 'fdw_tpc'; +INSERT INTO ft7_2pc VALUES(1); +COMMIT; +SELECT * FROM ft7_2pc; + +BEGIN; +INSERT INTO ft7_2pc VALUES(1); ROLLBACK; +SELECT * FROM ft7_2pc; + +-- Modify two servers then commit and rollback. This requires to use 2PC. +BEGIN; +INSERT INTO ft7_2pc VALUES(2); +INSERT INTO ft8_2pc VALUES(2); +COMMIT; +SELECT * FROM ft8_2pc; + +BEGIN; +INSERT INTO ft7_2pc VALUES(2); +INSERT INTO ft8_2pc VALUES(2); +ROLLBACK; +SELECT * FROM ft8_2pc; + +-- Modify both local data and 2PC-capable server then commit and rollback. +-- This also requires to use 2PC. +BEGIN; +INSERT INTO ft7_2pc VALUES(3); +INSERT INTO "S 1"."T 6" VALUES (3); +COMMIT; +SELECT * FROM ft7_2pc; +SELECT * FROM "S 1"."T 6"; + +BEGIN; +INSERT INTO ft7_2pc VALUES(3); +INSERT INTO "S 1"."T 6" VALUES (3); +ROLLBACK; +SELECT * FROM ft7_2pc; +SELECT * FROM "S 1"."T 6"; + +-- Modify foreign server and raise an error. No data changed. +BEGIN; +INSERT INTO ft7_2pc VALUES(4); +INSERT INTO ft8_2pc VALUES(NULL); -- violation +ROLLBACK; +SELECT * FROM ft8_2pc; + +BEGIN; +INSERT INTO ft7_2pc VALUES (5); +INSERT INTO ft8_2pc VALUES (5); +SAVEPOINT S1; +INSERT INTO ft7_2pc VALUES (6); +INSERT INTO ft8_2pc VALUES (6); +ROLLBACK TO S1; +COMMIT; +SELECT * FROM ft7_2pc; +SELECT * FROM ft8_2pc; +RELEASE SAVEPOINT S1; + +-- When set to 'disabled', we can commit it +SET foreign_twophase_commit TO 'disabled'; +BEGIN; +INSERT INTO ft7_2pc VALUES(8); +INSERT INTO ft8_2pc VALUES(8); +COMMIT; -- success +SELECT * FROM ft7_2pc; +SELECT * FROM ft8_2pc; + +SET foreign_twophase_commit TO 'required'; + +-- Commit and rollback foreign transactions that are part of +-- prepare transaction. +BEGIN; +INSERT INTO ft7_2pc VALUES(9); +INSERT INTO ft8_2pc VALUES(9); +PREPARE TRANSACTION 'gx1'; +COMMIT PREPARED 'gx1'; +SELECT * FROM ft8_2pc; + +BEGIN; +INSERT INTO ft7_2pc VALUES(9); +INSERT INTO ft8_2pc VALUES(9); +PREPARE TRANSACTION 'gx1'; +ROLLBACK PREPARED 'gx1'; +SELECT * FROM ft8_2pc; + +-- No entry remained +SELECT count(*) FROM pg_foreign_xacts; diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml index eab2cc9378..8783f2077c 100644 --- a/doc/src/sgml/postgres-fdw.sgml +++ b/doc/src/sgml/postgres-fdw.sgml @@ -521,9 +521,13 @@ OPTIONS (ADD password_required 'false'); - Note that it is currently not supported by - postgres_fdw to prepare the remote transaction for - two-phase commit. + postgrs_fdw support to prepare the remote transaction + for two-phase commit. Also, if two-phase commit protocol is required to + commit the distributed transaction, postgres_fdw commits + the remote transaction using two-phase commit protocol + (see ). So the remote server needs to set + set more than one so that + it can prepare the remote transaction. -- 2.23.0