From 7c9a8d6b41a524e211a414e147a3918e95a7b544 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Wed, 25 Nov 2020 21:02:29 +0900 Subject: [PATCH v32 08/11] Prepare foreign transactions at commit time With this commit, the foreign server modified within the transaction marked as 'modified'. On the 'modified' servers, foreign transactions are prepared automatically if foreign_twophase_commit is 'required'. Previously, users need to do PREPARE TRANSACTION and COMMIT/ROLLBACK PREPARED to use two-phase commit protocol. This commit enables users to use two-phase commit protocol transparently. Prepared foreign transactions are resolved in asynchronous manner by foreign transaction resolver process. Co-authored-by: Masahiko Sawada, Ashutosh Bapat --- src/backend/access/fdwxact/fdwxact.c | 191 +++++++++++++++++- src/backend/access/transam/xact.c | 7 + src/backend/utils/misc/guc.c | 28 +++ src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/access/fdwxact.h | 10 + src/include/foreign/fdwapi.h | 2 +- 6 files changed, 229 insertions(+), 11 deletions(-) diff --git a/src/backend/access/fdwxact/fdwxact.c b/src/backend/access/fdwxact/fdwxact.c index 7fc199cc55..adc81499e9 100644 --- a/src/backend/access/fdwxact/fdwxact.c +++ b/src/backend/access/fdwxact/fdwxact.c @@ -19,13 +19,27 @@ * * FOREIGN TRANSACTION RESOLUTION * + * The transaction involving multiple foreign transactions uses two-phase commit + * protocol to commit the distributed transaction if enabled. The basic strategy + * is that we prepare all of the remote transactions before committing locally and + * commit them after committing locally. + * + * At pre-commit of local transaction, we prepare the transactions on all foreign + * servers after logging the information of foreign transaction. The result of + * distributed transaction is determined by the result of the corresponding local + * transaction. Once the local transaction is successfully committed, all + * transactions on foreign servers must be committed. In case where an error occurred + * before the local transaction commit all transactions must be aborted. After + * committing or rolling back locally, we leave foreign transactions as in-doubt + * transactions and then notify the resolver process. The resolver process asynchronously + * resolves these foreign transactions according to the result of the corresponding local + * transaction. Also, the user can use pg_resolve_foreign_xact() SQL function to + * resolve a foreign transaction manually. + * * At PREPARE TRANSACTION, we prepare all transactions on foreign servers by executing * PrepareForeignTransaction() API regardless of data on the foreign server having been * modified. At COMMIT PREPARED and ROLLBACK PREPARED, we commit or rollback only the - * local transaction but not do anything for involved foreign transactions. The preapred - * foreign transactions are resolved by a resolver process asynchronously. Also, the - * user can use pg_resolve_foreign_xact() SQL function to resolve a foreign transaction - * manually. + * local transaction but not do anything for involved foreign transactions. * * LOCKING * @@ -92,8 +106,10 @@ #include "storage/ipc.h" #include "storage/latch.h" #include "storage/lock.h" +#include "storage/pmsignal.h" #include "storage/procarray.h" #include "storage/sinvaladt.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -105,6 +121,10 @@ #define ServerSupportTwophaseCommit(fdw_part) \ (((FdwXactParticipant *)(fdw_part))->prepare_foreign_xact_fn != NULL) +/* Foreign twophase commit is enabled and requested by user */ +#define IsForeignTwophaseCommitRequested() \ + (foreign_twophase_commit > FOREIGN_TWOPHASE_COMMIT_DISABLED) + /* Directory where the foreign prepared transaction files will reside */ #define FDWXACTS_DIR "pg_fdwxact" @@ -142,6 +162,9 @@ typedef struct FdwXactParticipant /* Transaction identifier used for PREPARE */ char *fdwxact_id; + /* true if modified the data on the server */ + bool modified; + /* Callbacks for foreign transaction */ CommitForeignTransaction_function commit_foreign_xact_fn; RollbackForeignTransaction_function rollback_foreign_xact_fn; @@ -152,18 +175,24 @@ typedef struct FdwXactParticipant /* * List of foreign transactions involved in the transaction. A member of * participants must support both commit and rollback APIs. + * + * ForeignTwophaseCommitIsRequired is true if the current transaction needs to + * be committed using two-phase commit protocol. */ static List *FdwXactParticipants = NIL; +static bool ForeignTwophaseCommitIsRequired = false; /* Keep track of registering process exit call back. */ static bool fdwXactExitRegistered = false; + /* Guc parameter */ int max_prepared_foreign_xacts = 0; int max_foreign_xact_resolvers = 0; +int foreign_twophase_commit = FOREIGN_TWOPHASE_COMMIT_DISABLED; static void AtProcExit_FdwXact(int code, Datum arg); -static void FdwXactPrepareForeignTransactions(TransactionId xid); +static void FdwXactPrepareForeignTransactions(TransactionId xid, bool prepare_all); static void ForgetAllFdwXactParticipants(void); static void FdwXactParticipantEndTransaction(FdwXactParticipant *fdw_part, bool commit); @@ -182,6 +211,7 @@ static char *ProcessFdwXactBuffer(Oid dbid, TransactionId xid, Oid serverid, static char *ReadFdwXactFile(Oid dbid, TransactionId xid, Oid serverid, Oid userid); static void RemoveFdwXactFile(Oid dbid, TransactionId xid, Oid serverid, Oid userid, bool giveWarning); +static bool checkForeignTwophaseCommitRequired(bool local_modified); static FdwXact insert_fdwxact(Oid dbid, TransactionId xid, Oid serverid, Oid userid, Oid umid, char *fdwxact_id); static void remove_fdwxact(FdwXact fdwxact); @@ -258,7 +288,7 @@ FdwXactShmemInit(void) * as a participant of the transaction. */ void -FdwXactRegisterXact(Oid serverid, Oid userid) +FdwXactRegisterXact(Oid serverid, Oid userid, bool modified) { FdwXactParticipant *fdw_part; MemoryContext old_ctx; @@ -273,6 +303,7 @@ FdwXactRegisterXact(Oid serverid, Oid userid) fdw_part->usermapping->userid == userid) { /* Already registered */ + fdw_part->modified |= modified; return; } } @@ -302,6 +333,7 @@ FdwXactRegisterXact(Oid serverid, Oid userid) old_ctx = MemoryContextSwitchTo(TopTransactionContext); fdw_part = create_fdwxact_participant(serverid, userid, routine); + fdw_part->modified = modified; /* Add to the participants list */ FdwXactParticipants = lappend(FdwXactParticipants, fdw_part); @@ -348,6 +380,7 @@ create_fdwxact_participant(Oid serverid, Oid userid, FdwRoutine *routine) fdw_part->server = foreign_server; fdw_part->usermapping = user_mapping; fdw_part->fdwxact_id = NULL; + fdw_part->modified = false; fdw_part->commit_foreign_xact_fn = routine->CommitForeignTransaction; fdw_part->rollback_foreign_xact_fn = routine->RollbackForeignTransaction; fdw_part->prepare_foreign_xact_fn = routine->PrepareForeignTransaction; @@ -356,11 +389,139 @@ create_fdwxact_participant(Oid serverid, Oid userid, FdwRoutine *routine) return fdw_part; } + /* + * Prepare all foreign transactions if foreign twophase commit is required. + * When foreign twophase commit is enabled, the behavior depends on the value + * of foreign_twophase_commit; when 'required' we strictly require for all + * foreign servers' FDW to support two-phase commit protocol and ask them to + * prepare foreign transactions, and when 'disabled' since we use one-phase + * commit these foreign transactions are committed at the transaction end. + * If we failed to prepare any of them we change to aborting. + */ +void +PreCommit_FdwXact(void) +{ + TransactionId xid; + bool local_modified; + + /* If there are no foreign servers involved, we have no business here */ + if (FdwXactParticipants == NIL) + return; + + Assert(!RecoveryInProgress()); + + /* + * Check if the current transaction did writes. We need to include the + * local node to the distributed transaction participant and to regard it + * as modified, if the current transaction has performed WAL logging and + * has assigned an xid. The transaction can end up not writing any WAL, + * even if it has an xid, if it only wrote to temporary and/or unlogged + * tables. It can end up having written WAL without an xid if did HOT + * pruning. + */ + xid = GetTopTransactionIdIfAny(); + local_modified = (TransactionIdIsValid(xid) && (XactLastRecEnd != 0)); + + /* + * Check if we need to use foreign twophase commit. Note that we don't + * support foreign twophase commit in single user mode. + */ + if (IsUnderPostmaster && checkForeignTwophaseCommitRequired(local_modified)) + { + /* + * Two-phase commit is required. Assign a transaction id to the + * current transaction if not yet because the local transaction is + * necessary to determine the result of the distributed transaction. + * Then we prepare foreign transactions on foreign servers that support + * two-phase commit. Note that we keep FdwXactParticipants until the + * end of the transaction. + */ + if (!TransactionIdIsValid(xid)) + xid = GetTopTransactionId(); + FdwXactPrepareForeignTransactions(xid, false); + ForeignTwophaseCommitIsRequired = true; + } +} + +/* Return true if the current transaction needs to use two-phase commit */ +bool +FdwXactIsForeignTwophaseCommitRequired(void) +{ + return ForeignTwophaseCommitIsRequired; +} + +/* + * Return true if the current transaction modifies data on two or more servers + * in FdwXactParticipants and local server itself. + */ +static bool +checkForeignTwophaseCommitRequired(bool local_modified) +{ + ListCell *lc; + bool have_notwophase = false; + int nserverswritten = 0; + + if (!IsForeignTwophaseCommitRequested()) + return false; + + foreach(lc, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lc); + + if (!fdw_part->modified) + continue; + + if (!ServerSupportTwophaseCommit(fdw_part)) + have_notwophase = true; + + nserverswritten++; + } + + /* Did we modify the local non-temporary data? */ + if (local_modified) + nserverswritten++; + + /* + * Two-phase commit is not required if the number of servers performed + * writes is less than 2. + */ + if (nserverswritten < 2) + return false; + + Assert(foreign_twophase_commit == FOREIGN_TWOPHASE_COMMIT_REQUIRED); + + /* Two-phase commit is required. Check parameters */ + if (max_prepared_foreign_xacts == 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("foreign two-phase commit is required but prepared foreign transactions are disabled"), + errhint("Set max_prepared_foreign_transactions to a nonzero value."))); + + if (max_foreign_xact_resolvers == 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("foreign two-phase commit is required but prepared foreign transactions are disabled"), + errhint("Set max_foreign_transaction_resolvers to a nonzero value."))); + + if (have_notwophase) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot process a distributed transaction that has operated on a foreign server that does not support two-phase commit protocol"), + errdetail("foreign_twophase_commit is \'required\' but the transaction has some foreign servers which are not capable of two-phase commit"))); + + return true; +} + /* - * Insert FdwXact entries and prepare foreign transactions. + * Insert FdwXact entries and prepare foreign transactions. If prepare_all is + * true, we prepare all foreign transaction regardless of writes having happened + * on the server. + * + * We still can change to rollback here on failure. If any error occurs, we + * rollback non-prepared foreign transactions. */ static void -FdwXactPrepareForeignTransactions(TransactionId xid) +FdwXactPrepareForeignTransactions(TransactionId xid, bool prepare_all) { ListCell *lc; @@ -378,6 +539,9 @@ FdwXactPrepareForeignTransactions(TransactionId xid) CHECK_FOR_INTERRUPTS(); + if (!prepare_all && !fdw_part->modified) + continue; + /* Get prepared transaction identifier */ fdw_part->fdwxact_id = get_fdwxact_identifier(fdw_part, xid); Assert(fdw_part->fdwxact_id); @@ -755,7 +919,10 @@ ForgetAllFdwXactParticipants(void) int nlefts = 0; if (FdwXactParticipants == NIL) + { + Assert(!ForeignTwophaseCommitIsRequired); return; + } foreach(cell, FdwXactParticipants) { @@ -812,7 +979,10 @@ AtEOXact_FdwXact(bool is_commit) if (!fdwxact) { - /* Commit or rollback the foreign transaction in one-phase */ + /* + * If this participant doesn't have an FdwXact entry, it's not + * prepared yet. Therefore we can commit or rollback it in one-phase. + */ Assert(ServerSupportTransactionCallback(fdw_part)); FdwXactParticipantEndTransaction(fdw_part, is_commit); continue; @@ -842,6 +1012,7 @@ AtEOXact_FdwXact(bool is_commit) } ForgetAllFdwXactParticipants(); + ForeignTwophaseCommitIsRequired = false; } /* @@ -881,7 +1052,7 @@ PrePrepare_FdwXact(void) * prepare all foreign transactions. */ xid = GetTopTransactionId(); - FdwXactPrepareForeignTransactions(xid); + FdwXactPrepareForeignTransactions(xid, true); /* * We keep FdwXactParticipants until the transaction end so that we change diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index a87f6b5abf..657778d926 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -22,6 +22,7 @@ #include "access/commit_ts.h" #include "access/fdwxact.h" +#include "access/fdwxact_launcher.h" #include "access/multixact.h" #include "access/parallel.h" #include "access/subtrans.h" @@ -1456,6 +1457,9 @@ RecordTransactionCommit(void) if (wrote_xlog && markXidCommitted) SyncRepWaitForLSN(XactLastRecEnd, true); + if (FdwXactIsForeignTwophaseCommitRequired()) + FdwXactLaunchOrWakeupResolver(); + /* remember end of last commit record */ XactLastCommitEnd = XactLastRecEnd; @@ -2123,6 +2127,9 @@ CommitTransaction(void) * the transaction-abort path. */ + /* Pre-commit step for foreign transactions */ + PreCommit_FdwXact(); + CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT : XACT_EVENT_PRE_COMMIT); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index add8e598e8..f530cd20dd 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -501,6 +501,24 @@ static struct config_enum_entry shared_memory_options[] = { {NULL, 0, false} }; +/* + * Although only "required" and "disabled" are documented, we accept all + * the likely variants of "on" and "off". + */ +static const struct config_enum_entry foreign_twophase_commit_options[] = { + {"required", FOREIGN_TWOPHASE_COMMIT_REQUIRED, false}, + {"disabled", FOREIGN_TWOPHASE_COMMIT_DISABLED, false}, + {"on", FOREIGN_TWOPHASE_COMMIT_REQUIRED, false}, + {"off", FOREIGN_TWOPHASE_COMMIT_DISABLED, false}, + {"true", FOREIGN_TWOPHASE_COMMIT_REQUIRED, true}, + {"false", FOREIGN_TWOPHASE_COMMIT_DISABLED, true}, + {"yes", FOREIGN_TWOPHASE_COMMIT_REQUIRED, true}, + {"no", FOREIGN_TWOPHASE_COMMIT_DISABLED, true}, + {"1", FOREIGN_TWOPHASE_COMMIT_REQUIRED, true}, + {"0", FOREIGN_TWOPHASE_COMMIT_DISABLED, true}, + {NULL, 0, false} +}; + /* * Options for enum values stored in other modules */ @@ -4703,6 +4721,16 @@ static struct config_enum ConfigureNamesEnum[] = NULL, assign_synchronous_commit, NULL }, + { + {"foreign_twophase_commit", PGC_USERSET, FOREIGN_TRANSACTION, + gettext_noop("Use of foreign twophase commit for the current transaction."), + NULL + }, + &foreign_twophase_commit, + FOREIGN_TWOPHASE_COMMIT_DISABLED, foreign_twophase_commit_options, + NULL, NULL, NULL + }, + { {"archive_mode", PGC_POSTMASTER, WAL_ARCHIVING, gettext_noop("Allows archiving of WAL files using archive_command."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 58ac54b8c8..6165c6d689 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -746,6 +746,8 @@ # retrying to resolve # foreign transactions # after a failed attempt +#foreign_twophase_commit = disabled # use two-phase commit for distributed transactions: + # disabled or required #------------------------------------------------------------------------------ # VERSION AND PLATFORM COMPATIBILITY diff --git a/src/include/access/fdwxact.h b/src/include/access/fdwxact.h index a3763e52c0..6bf4f5dd7d 100644 --- a/src/include/access/fdwxact.h +++ b/src/include/access/fdwxact.h @@ -20,6 +20,14 @@ #define FDWXACT_FLAG_ONEPHASE 0x01 /* transaction can commit/rollback * without preparation */ +/* Enum for foreign_twophase_commit parameter */ +typedef enum +{ + FOREIGN_TWOPHASE_COMMIT_DISABLED, /* disable foreign twophase commit */ + FOREIGN_TWOPHASE_COMMIT_REQUIRED /* all foreign servers have to support + * twophase commit */ +} ForeignTwophaseCommitLevel; + /* Enum to track the status of foreign transaction */ typedef enum { @@ -107,10 +115,12 @@ extern int max_prepared_foreign_xacts; extern int max_foreign_xact_resolvers; extern int foreign_xact_resolution_retry_interval; extern int foreign_xact_resolver_timeout; +extern int foreign_twophase_commit; /* Function declarations */ extern Size FdwXactShmemSize(void); extern void FdwXactShmemInit(void); +extern void PreCommit_FdwXact(void); extern void AtEOXact_FdwXact(bool is_commit); extern void PrePrepare_FdwXact(void); extern bool FdwXactIsForeignTwophaseCommitRequired(void); diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 7885827172..5fd51b408c 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -273,7 +273,7 @@ extern bool IsImportableForeignTable(const char *tablename, extern Path *GetExistingLocalJoinPath(RelOptInfo *joinrel); /* Functions in fdwxact/fdwxact.c */ -extern void FdwXactRegisterXact(Oid serverid, Oid userid); +extern void FdwXactRegisterXact(Oid serverid, Oid userid, bool modified); extern void FdwXactUnregisterXact(Oid serverid, Oid userid); #endif /* FDWAPI_H */ -- 2.27.0