From 26790410870ac628738aa8884371208d78f8b1a7 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 28 Aug 2020 22:25:38 +0900 Subject: [PATCH v32 01/11] Introduce transaction manager for foreign transactions. The global transaciton manager manages the transactions initiated on the foreign server. This commit also adds both CommitForeignTransaction and RollbackForeignTransaction FDW APIs supporing only one-phase commit. FDW that implements these APIs can be managed by the global transaciton manager. So FDW is able to control its transaction using the foreign transaction manager, not using XactCallback. Co-authored-by: Masahiko Sawada, Ashutosh Bapat --- src/backend/access/Makefile | 4 +- src/backend/access/fdwxact/Makefile | 17 ++ src/backend/access/fdwxact/fdwxact.c | 233 +++++++++++++++++++++++++++ src/backend/access/transam/xact.c | 10 ++ src/backend/foreign/foreign.c | 4 + src/include/access/fdwxact.h | 33 ++++ src/include/foreign/fdwapi.h | 12 ++ 7 files changed, 311 insertions(+), 2 deletions(-) create mode 100644 src/backend/access/fdwxact/Makefile create mode 100644 src/backend/access/fdwxact/fdwxact.c create mode 100644 src/include/access/fdwxact.h diff --git a/src/backend/access/Makefile b/src/backend/access/Makefile index 0880e0a8bb..2372a1a690 100644 --- a/src/backend/access/Makefile +++ b/src/backend/access/Makefile @@ -8,7 +8,7 @@ subdir = src/backend/access top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = brin common gin gist hash heap index nbtree rmgrdesc spgist \ - table tablesample transam +SUBDIRS = brin common fdwxact gin gist hash heap index nbtree rmgrdesc \ + spgist table tablesample transam include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/fdwxact/Makefile b/src/backend/access/fdwxact/Makefile new file mode 100644 index 0000000000..aacab1d729 --- /dev/null +++ b/src/backend/access/fdwxact/Makefile @@ -0,0 +1,17 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for access/fdwxact +# +# IDENTIFICATION +# src/backend/access/fdwxact/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/access/fdwxact +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = fdwxact.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/fdwxact/fdwxact.c b/src/backend/access/fdwxact/fdwxact.c new file mode 100644 index 0000000000..00da860b31 --- /dev/null +++ b/src/backend/access/fdwxact/fdwxact.c @@ -0,0 +1,233 @@ +/*------------------------------------------------------------------------- + * + * fdwxact.c + * PostgreSQL global transaction manager for foreign servers. + * + * This module contains the code for managing transactions started on foreign + * servers. + * + * FDW who implements both commit and rollback APIs can request to register the + * foreign transaction by FdwXactRegisterXact() to participate it to a + * group of distributed tranasction. The registered foreign transactions are + * identified by OIDs of server and user. On commit and rollback, the global + * transaction manager calls corresponding FDW API to end the tranasctions. + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/fdwxact/fdwxact.c + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/fdwxact.h" +#include "access/xlog.h" +#include "foreign/fdwapi.h" +#include "foreign/foreign.h" +#include "utils/memutils.h" + +/* Check the FdwXactParticipant is capable of two-phase commit */ +#define ServerSupportTransactionCallback(fdw_part) \ + (((FdwXactParticipant *)(fdw_part))->commit_foreign_xact_fn != NULL) + +/* + * Structure to bundle the foreign transaction participant. This struct + * needs to live until the end of transaction where we cannot look at + * syscaches. Therefore, this is allocated in the TopTransactionContext. + */ +typedef struct FdwXactParticipant +{ + /* Foreign server and user mapping info, passed to callback routines */ + ForeignServer *server; + UserMapping *usermapping; + + /* Callbacks for foreign transaction */ + CommitForeignTransaction_function commit_foreign_xact_fn; + RollbackForeignTransaction_function rollback_foreign_xact_fn; +} FdwXactParticipant; + +/* + * List of foreign transactions involved in the transaction. A member of + * participants must support both commit and rollback APIs. + */ +static List *FdwXactParticipants = NIL; + +static void ForgetAllFdwXactParticipants(void); +static void FdwXactParticipantEndTransaction(FdwXactParticipant *fdw_part, + bool commit); +static FdwXactParticipant *create_fdwxact_participant(Oid serverid, Oid userid, + FdwRoutine *routine); + +/* + * Register the given foreign transaction identified by the given arguments + * as a participant of the transaction. + */ +void +FdwXactRegisterXact(Oid serverid, Oid userid) +{ + FdwXactParticipant *fdw_part; + MemoryContext old_ctx; + FdwRoutine *routine; + ListCell *lc; + + foreach(lc, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lc); + + if (fdw_part->server->serverid == serverid && + fdw_part->usermapping->userid == userid) + { + /* Already registered */ + return; + } + } + + routine = GetFdwRoutineByServerId(serverid); + + /* + * Foreign server managed by the transaction manager must implement + * transaction callbacks. + */ + if (!routine->CommitForeignTransaction) + ereport(ERROR, + (errmsg("cannot register foreign server not supporting transaction callback"))); + + /* + * Participant's information is also used at the end of a transaction, + * where system cache are not available. Save it in TopTransactionContext + * so that these can live until the end of transaction. + */ + old_ctx = MemoryContextSwitchTo(TopTransactionContext); + + fdw_part = create_fdwxact_participant(serverid, userid, routine); + + /* Add to the participants list */ + FdwXactParticipants = lappend(FdwXactParticipants, fdw_part); + + /* Revert back the context */ + MemoryContextSwitchTo(old_ctx); +} + +/* Remove the given foreign server from FdwXactParticipants */ +void +FdwXactUnregisterXact(Oid serverid, Oid userid) +{ + ListCell *lc; + + foreach(lc, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lc); + + if (fdw_part->server->serverid == serverid && + fdw_part->usermapping->userid == userid) + { + /* Remove the entry */ + FdwXactParticipants = + foreach_delete_current(FdwXactParticipants, lc); + break; + } + } +} + +/* Return palloc'd FdwXactParticipant variable */ +static FdwXactParticipant * +create_fdwxact_participant(Oid serverid, Oid userid, FdwRoutine *routine) +{ + FdwXactParticipant *fdw_part; + ForeignServer *foreign_server; + UserMapping *user_mapping; + + foreign_server = GetForeignServer(serverid); + user_mapping = GetUserMapping(userid, serverid); + + fdw_part = (FdwXactParticipant *) palloc(sizeof(FdwXactParticipant)); + + fdw_part->server = foreign_server; + fdw_part->usermapping = user_mapping; + fdw_part->commit_foreign_xact_fn = routine->CommitForeignTransaction; + fdw_part->rollback_foreign_xact_fn = routine->RollbackForeignTransaction; + + return fdw_part; +} + +/* + * The routine for committing or rolling back the given transaction participant. + */ +static void +FdwXactParticipantEndTransaction(FdwXactParticipant *fdw_part, bool commit) +{ + FdwXactRslvState state; + + Assert(ServerSupportTransactionCallback(fdw_part)); + + state.server = fdw_part->server; + state.usermapping = fdw_part->usermapping; + state.flags = FDWXACT_FLAG_ONEPHASE; + + if (commit) + { + fdw_part->commit_foreign_xact_fn(&state); + elog(DEBUG1, "successfully committed the foreign transaction for server %u user %u", + fdw_part->usermapping->serverid, + fdw_part->usermapping->userid); + } + else + { + fdw_part->rollback_foreign_xact_fn(&state); + elog(DEBUG1, "successfully rolled back the foreign transaction for server %u user %u", + fdw_part->usermapping->serverid, + fdw_part->usermapping->userid); + } +} + +/* + * Clear the FdwXactParticipants list. + */ +static void +ForgetAllFdwXactParticipants(void) +{ + if (FdwXactParticipants == NIL) + return; + + list_free_deep(FdwXactParticipants); + FdwXactParticipants = NIL; +} + +/* + * Commit or rollback all foreign transactions. + */ +void +AtEOXact_FdwXact(bool is_commit) +{ + ListCell *lc; + + /* If there are no foreign servers involved, we have no business here */ + if (FdwXactParticipants == NIL) + return; + + Assert(!RecoveryInProgress()); + + /* Commit or rollback foreign transactions in the participant list */ + foreach(lc, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lc); + + Assert(ServerSupportTransactionCallback(fdw_part)); + FdwXactParticipantEndTransaction(fdw_part, is_commit); + } + + ForgetAllFdwXactParticipants(); +} + +/* + * Check if the local transaction has any foreign transaction. + */ +void +PrePrepare_FdwXact(void) +{ + /* We don't support to prepare foreign transactions */ + if (FdwXactParticipants != NIL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE a transaction that has operated on foreign tables"))); +} diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index a2068e3fd4..b8990af8b6 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -21,6 +21,7 @@ #include #include "access/commit_ts.h" +#include "access/fdwxact.h" #include "access/multixact.h" #include "access/parallel.h" #include "access/subtrans.h" @@ -2230,6 +2231,9 @@ CommitTransaction(void) CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_COMMIT : XACT_EVENT_COMMIT); + /* Commit foreign transaction if any */ + AtEOXact_FdwXact(true); + ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, true, true); @@ -2369,6 +2373,9 @@ PrepareTransaction(void) * the transaction-abort path. */ + /* Prepare foreign trasactions */ + PrePrepare_FdwXact(); + /* Shut down the deferred-trigger manager */ AfterTriggerEndXact(true); @@ -2756,6 +2763,9 @@ AbortTransaction(void) else CallXactCallbacks(XACT_EVENT_ABORT); + /* Rollback foreign transactions if any */ + AtEOXact_FdwXact(false); + ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, false, true); diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c index 5564dc3a1e..d50dc099c6 100644 --- a/src/backend/foreign/foreign.c +++ b/src/backend/foreign/foreign.c @@ -328,6 +328,10 @@ GetFdwRoutine(Oid fdwhandler) elog(ERROR, "foreign-data wrapper handler function %u did not return an FdwRoutine struct", fdwhandler); + /* The FDW must support either both APIs or neither */ + Assert((routine->CommitForeignTransaction && routine->RollbackForeignTransaction) || + (!routine->CommitForeignTransaction && !routine->RollbackForeignTransaction)); + return routine; } diff --git a/src/include/access/fdwxact.h b/src/include/access/fdwxact.h new file mode 100644 index 0000000000..6c8b111ab5 --- /dev/null +++ b/src/include/access/fdwxact.h @@ -0,0 +1,33 @@ +/* + * fdwxact.h + * + * PostgreSQL global transaction manager + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * + * src/include/access/fdwxact.h + */ +#ifndef FDWXACT_H +#define FDWXACT_H + +#include "foreign/foreign.h" + +/* Flag passed to FDW transaction management APIs */ +#define FDWXACT_FLAG_ONEPHASE 0x01 /* transaction can commit/rollback + * without preparation */ + +/* State data for foreign transaction resolution, passed to FDW callbacks */ +typedef struct FdwXactRslvState +{ + /* Foreign transaction information */ + ForeignServer *server; + UserMapping *usermapping; + + int flags; /* OR of FDWXACT_FLAG_xx flags */ +} FdwXactRslvState; + +/* Function declarations */ +extern void AtEOXact_FdwXact(bool is_commit); +extern void PrePrepare_FdwXact(void); + +#endif /* FDWXACT_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 2953499fb1..570e605e1a 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -13,6 +13,7 @@ #define FDWAPI_H #include "access/parallel.h" +#include "access/fdwxact.h" #include "nodes/execnodes.h" #include "nodes/pathnodes.h" @@ -170,6 +171,9 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root, List *fdw_private, RelOptInfo *child_rel); +typedef void (*CommitForeignTransaction_function) (FdwXactRslvState *frstate); +typedef void (*RollbackForeignTransaction_function) (FdwXactRslvState *frstate); + /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler * function. It provides pointers to the callback functions needed by the @@ -246,6 +250,10 @@ typedef struct FdwRoutine /* Support functions for path reparameterization. */ ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild; + + /* Support functions for transaction management */ + CommitForeignTransaction_function CommitForeignTransaction; + RollbackForeignTransaction_function RollbackForeignTransaction; } FdwRoutine; @@ -259,4 +267,8 @@ extern bool IsImportableForeignTable(const char *tablename, ImportForeignSchemaStmt *stmt); extern Path *GetExistingLocalJoinPath(RelOptInfo *joinrel); +/* Functions in fdwxact/fdwxact.c */ +extern void FdwXactRegisterXact(Oid serverid, Oid userid); +extern void FdwXactUnregisterXact(Oid serverid, Oid userid); + #endif /* FDWAPI_H */ -- 2.27.0