From 30b2c062599becb9266096d5a86595edff27b628 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 21 Sep 2020 17:00:21 +0900 Subject: [PATCH v32 05/11] postgres_fdw supports prepare API. This commit implements PrepareForeignTransaction API in postgres_fdw, enabling commit and rollback foreign transactions using by two-phase commit protocol. Co-authored-by: Masahiko Sawada, Ashutosh Bapat --- contrib/postgres_fdw/connection.c | 137 +++++++++++++++++- .../postgres_fdw/expected/postgres_fdw.out | 13 -- contrib/postgres_fdw/postgres_fdw.c | 1 + contrib/postgres_fdw/postgres_fdw.h | 1 + contrib/postgres_fdw/sql/postgres_fdw.sql | 7 - 5 files changed, 135 insertions(+), 24 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index e8aafca42d..3c22060f27 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -96,6 +96,8 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, static bool UserMappingPasswordRequired(UserMapping *user); static void pgfdw_cleanup_after_transaction(ConnCacheEntry *entry); static ConnCacheEntry *GetConnectionCacheEntry(Oid umid); +static void pgfdw_end_prepared_xact(ConnCacheEntry *entry, UserMapping *usermapping, + char *fdwxact_id, bool is_commit); /* * Get a PGconn which can be used to execute queries on the remote PostgreSQL @@ -1166,12 +1168,19 @@ void postgresCommitForeignTransaction(FdwXactRslvState *frstate) { ConnCacheEntry *entry; + bool is_onephase = (frstate->flags & FDWXACT_FLAG_ONEPHASE) != 0; PGresult *res; - Assert((frstate->flags & FDWXACT_FLAG_ONEPHASE) != 0); - entry = GetConnectionCacheEntry(frstate->usermapping->umid); + if (!is_onephase) + { + /* COMMIT PREPARED the transaction and cleanup */ + pgfdw_end_prepared_xact(entry, frstate->usermapping, + frstate->fdwxact_id, true); + return; + } + Assert(entry->conn); /* @@ -1217,16 +1226,24 @@ void postgresRollbackForeignTransaction(FdwXactRslvState *frstate) { ConnCacheEntry *entry = NULL; + bool is_onephase = (frstate->flags & FDWXACT_FLAG_ONEPHASE) != 0; bool abort_cleanup_failure = false; - Assert((frstate->flags & FDWXACT_FLAG_ONEPHASE) != 0); - /* * In simple rollback case, we must have a connection to the foreign server * because the foreign transaction is not closed yet. We get the connection * entry from the cache. */ entry = GetConnectionCacheEntry(frstate->usermapping->umid); + + if (!is_onephase) + { + /* ROLLBACK PREPARED the transaction and cleanup */ + pgfdw_end_prepared_xact(entry, frstate->usermapping, + frstate->fdwxact_id, false); + return; + } + Assert(entry); /* @@ -1303,6 +1320,46 @@ postgresRollbackForeignTransaction(FdwXactRslvState *frstate) return; } +/* + * Prepare a transaction on foreign server. + */ +void +postgresPrepareForeignTransaction(FdwXactRslvState *frstate) +{ + ConnCacheEntry *entry = NULL; + PGresult *res; + StringInfo command; + + /* The transaction should have started already get the cache entry */ + entry = GetConnectionCacheEntry(frstate->usermapping->umid); + Assert(entry->conn); + + pgfdw_reject_incomplete_xact_state_change(entry); + + command = makeStringInfo(); + appendStringInfo(command, "PREPARE TRANSACTION '%s'", frstate->fdwxact_id); + + /* Do prepare foreign transaction */ + entry->changing_xact_state = true; + res = pgfdw_exec_query(entry->conn, command->data); + entry->changing_xact_state = false; + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ereport(ERROR, (errmsg("could not prepare transaction on server %s with ID %s", + frstate->server->servername, frstate->fdwxact_id))); + + elog(DEBUG1, "prepared foreign transaction on server %s with ID %s", + frstate->server->servername, frstate->fdwxact_id); + + if (entry->have_prep_stmt && entry->have_error) + { + res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); + } + + pgfdw_cleanup_after_transaction(entry); +} + /* Cleanup at main-transaction end */ static void pgfdw_cleanup_after_transaction(ConnCacheEntry *entry) @@ -1329,3 +1386,75 @@ pgfdw_cleanup_after_transaction(ConnCacheEntry *entry) /* Also reset cursor numbering for next transaction */ cursor_number = 0; } + +/* + * Commit or rollback prepared transaction on the foreign server. + */ +static void +pgfdw_end_prepared_xact(ConnCacheEntry *entry, UserMapping *usermapping, + char *fdwxact_id, bool is_commit) +{ + StringInfo command; + PGresult *res; + + /* + * Check the connection status for the case the previous attempt + * failed. + */ + if (entry->conn && PQstatus(entry->conn) != CONNECTION_OK) + disconnect_pg_server(entry); + + /* + * In two-phase commit case, since the transaction is about to be + * resolved by a different process than the process who prepared it, + * we might not have a connection yet. + */ + if (!entry->conn) + make_new_connection(entry, usermapping); + + command = makeStringInfo(); + appendStringInfo(command, "%s PREPARED '%s'", + is_commit ? "COMMIT" : "ROLLBACK", + fdwxact_id); + + /* + * Once the transaction is prepared, further transaction callback is not + * called even when an error occurred during resolving it. Therefore, we + * don't need to set changing_xact_state here. On failure the new connection + * will be established either when the new transaction is started or when + * checking the connection status above. + */ + res = pgfdw_exec_query(entry->conn, command->data); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + int sqlstate; + char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + + if (diag_sqlstate) + { + sqlstate = MAKE_SQLSTATE(diag_sqlstate[0], + diag_sqlstate[1], + diag_sqlstate[2], + diag_sqlstate[3], + diag_sqlstate[4]); + } + else + sqlstate = ERRCODE_CONNECTION_FAILURE; + + /* + * As core global transaction manager states, it's possible that the + * given foreign transaction doesn't exist on the foreign server. So + * we should accept an UNDEFINED_OBJECT error. + */ + if (sqlstate != ERRCODE_UNDEFINED_OBJECT) + pgfdw_report_error(ERROR, res, entry->conn, false, command->data); + } + + elog(DEBUG1, "%s prepared foreign transaction with ID %s", + is_commit ? "commit" : "rollback", + fdwxact_id); + + /* Cleanup transaction status */ + pgfdw_cleanup_after_transaction(entry); +} diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 707f1d7cd4..b7cae97600 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -8974,19 +8974,6 @@ DROP OWNED BY regress_nosuper; DROP ROLE regress_nosuper; -- Clean-up RESET enable_partitionwise_aggregate; --- Two-phase transactions are not supported. -BEGIN; -SELECT count(*) FROM ft1; - count -------- - 822 -(1 row) - --- error here -PREPARE TRANSACTION 'fdw_tpc'; -ERROR: cannot PREPARE a distributed transaction which has operated on a foreign server not supporting two-phase commit protocol -ROLLBACK; -WARNING: there is no transaction in progress -- =================================================================== -- reestablish new connection -- =================================================================== diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index ad00a9ce2b..8162e0ace7 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -563,6 +563,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for foreign transactions */ routine->CommitForeignTransaction = postgresCommitForeignTransaction; routine->RollbackForeignTransaction = postgresRollbackForeignTransaction; + routine->PrepareForeignTransaction = postgresPrepareForeignTransaction; PG_RETURN_POINTER(routine); } diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index d714034d6b..788605cfc2 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -140,6 +140,7 @@ extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql); extern void postgresCommitForeignTransaction(FdwXactRslvState *frstate); extern void postgresRollbackForeignTransaction(FdwXactRslvState *frstate); +extern void postgresPrepareForeignTransaction(FdwXactRslvState *frstate); /* in option.c */ extern int ExtractConnectionOptions(List *defelems, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 25dbc08b98..666f39210f 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -2647,13 +2647,6 @@ DROP ROLE regress_nosuper; -- Clean-up RESET enable_partitionwise_aggregate; --- Two-phase transactions are not supported. -BEGIN; -SELECT count(*) FROM ft1; --- error here -PREPARE TRANSACTION 'fdw_tpc'; -ROLLBACK; - -- =================================================================== -- reestablish new connection -- =================================================================== -- 2.27.0