From 5be5729cdf9481051e9e95db10d9321735139b32 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Sun, 20 Sep 2020 16:49:20 +0900 Subject: [PATCH v32 04/11] Add PrepareForeignTransaction API. This commits add a new FDW API, PrepareForeignTransaction. Using this API, the transactions initiated on the foreign server are preapred at PREPARE TRANSACTION time. The information of prepared foreign transactions involved with the distributed transaction is crash-safe. However these functions are neither committed nor aborted at COMMIT/ROLLBACK PREPARED time. To resolve these transactions, this commit also adds pg_resolve_foreign_xact() SQL function. Co-authored-by: Masahiko Sawada, Ashutosh Bapat --- .../postgres_fdw/expected/postgres_fdw.out | 2 +- src/backend/access/fdwxact/fdwxact.c | 1754 ++++++++++++++++- src/backend/access/rmgrdesc/Makefile | 1 + src/backend/access/rmgrdesc/fdwxactdesc.c | 58 + src/backend/access/rmgrdesc/xlogdesc.c | 6 +- src/backend/access/transam/rmgr.c | 1 + src/backend/access/transam/twophase.c | 28 + src/backend/access/transam/xact.c | 1 + src/backend/access/transam/xlog.c | 41 +- src/backend/catalog/system_views.sql | 3 + src/backend/commands/foreigncmds.c | 22 + src/backend/foreign/foreign.c | 6 + src/backend/postmaster/pgstat.c | 9 + src/backend/postmaster/postmaster.c | 1 + src/backend/replication/logical/decode.c | 1 + src/backend/storage/ipc/ipci.c | 3 + src/backend/storage/ipc/procarray.c | 56 +- src/backend/storage/lmgr/lwlocknames.txt | 1 + src/backend/utils/misc/guc.c | 11 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/bin/initdb/initdb.c | 1 + src/bin/pg_controldata/pg_controldata.c | 2 + src/bin/pg_resetwal/pg_resetwal.c | 2 + src/bin/pg_waldump/rmgrdesc.c | 1 + src/include/access/fdwxact.h | 88 + src/include/access/fdwxact_xlog.h | 54 + src/include/access/rmgrlist.h | 1 + src/include/access/twophase.h | 1 + src/include/access/xlog_internal.h | 1 + src/include/catalog/pg_control.h | 1 + src/include/catalog/pg_proc.dat | 18 + src/include/foreign/fdwapi.h | 2 + src/include/pgstat.h | 3 + src/include/storage/procarray.h | 2 + src/test/regress/expected/rules.out | 7 + 35 files changed, 2163 insertions(+), 28 deletions(-) create mode 100644 src/backend/access/rmgrdesc/fdwxactdesc.c create mode 100644 src/include/access/fdwxact_xlog.h diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 3724fdab3d..707f1d7cd4 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -8984,7 +8984,7 @@ SELECT count(*) FROM ft1; -- error here PREPARE TRANSACTION 'fdw_tpc'; -ERROR: cannot PREPARE a transaction that has operated on foreign tables +ERROR: cannot PREPARE a distributed transaction which has operated on a foreign server not supporting two-phase commit protocol ROLLBACK; WARNING: there is no transaction in progress -- =================================================================== diff --git a/src/backend/access/fdwxact/fdwxact.c b/src/backend/access/fdwxact/fdwxact.c index 00da860b31..cbbd53dc7d 100644 --- a/src/backend/access/fdwxact/fdwxact.c +++ b/src/backend/access/fdwxact/fdwxact.c @@ -9,8 +9,59 @@ * FDW who implements both commit and rollback APIs can request to register the * foreign transaction by FdwXactRegisterXact() to participate it to a * group of distributed tranasction. The registered foreign transactions are - * identified by OIDs of server and user. On commit and rollback, the global - * transaction manager calls corresponding FDW API to end the tranasctions. + * identified by OIDs of server and user. On commit, rollback and prepare, the + * global transaction manager calls corresponding FDW API to end the tranasctions. + * + * To achieve commit among all foreign servers atomically, the global transaction + * manager supports two-phase commit protocol, which is a type of atomic commitment + * protocol(ACP). Two-phase commit protocol is crash-safe. We WAL logs the foreign + * transaction information. + * + * FOREIGN TRANSACTION RESOLUTION + * + * At PREPARE TRANSACTION, we prepare all transactions on foreign servers by executing + * PrepareForeignTransaction() API regardless of data on the foreign server having been + * modified. At COMMIT PREPARED and ROLLBACK PREPARED, we commit or rollback only the + * local transaction but not do anything for involved foreign transactions. To resolve + * these foreign transactions the user needs to use pg_resolve_foreign_xact() SQL + * function that resolve a foreign transaction according to the result of the + * corresponding local transaction. + * + * LOCKING + * + * Whenever a foreign transaction is processed, the corresponding FdwXact + * entry is update. To avoid holding the lock during transaction processing + * which may take an unpredicatable time the in-memory data of foreign + * transaction follows a locking model based on the following linked concepts: + * + * * All FdwXact fields except for status are protected by FdwXactLock. The + * status is protected by its mutex. + * * A process who is going to process foreign transaction needs to set + * locking_backend of the FdwXact entry to lock the entry, which prevents the entry from + * being updated and removed by concurrent processes. + * + * RECOVERY + * + * During replay WAL and replication FdwXactCtl also holds information about + * active prepared foreign transaction that haven't been moved to disk yet. + * + * Replay of fdwxact records happens by the following rules: + * + * * At the beginning of recovery, pg_fdwxacts is scanned once, filling FdwXact + * with entries marked with fdwxact->inredo and fdwxact->ondisk. FdwXact file + * data older than the XID horizon of the redo position are discarded. + * * On PREPARE redo, the foreign transaction is added to FdwXactCtl->fdwxacts. + * We set fdwxact->inredo to true for such entries. + * * On Checkpoint we iterate through FdwXactCtl->fdwxacts entries that + * have fdwxact->inredo set and are behind the redo_horizon. We save + * them to disk and then set fdwxact->ondisk to true. + * * On resolution we delete the entry from FdwXactCtl->fdwxacts. If + * fdwxact->ondisk is true, the corresponding entry from the disk is + * additionally deleted. + * * RecoverFdwXacts() and PrescanFdwXacts() have been modified to go through + * fdwxact->inredo entries that have not made it to disk. + * + * These replay rules are borrowed from twophase.c * * Portions Copyright (c) 2020, PostgreSQL Global Development Group * @@ -20,15 +71,53 @@ */ #include "postgres.h" +#include +#include +#include + #include "access/fdwxact.h" +#include "access/twophase.h" +#include "access/xact.h" #include "access/xlog.h" +#include "access/xloginsert.h" +#include "access/xlogutils.h" #include "foreign/fdwapi.h" #include "foreign/foreign.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/lock.h" +#include "storage/procarray.h" +#include "storage/sinvaladt.h" +#include "utils/builtins.h" +#include "utils/guc.h" #include "utils/memutils.h" +#include "utils/rel.h" /* Check the FdwXactParticipant is capable of two-phase commit */ #define ServerSupportTransactionCallback(fdw_part) \ (((FdwXactParticipant *)(fdw_part))->commit_foreign_xact_fn != NULL) +#define ServerSupportTwophaseCommit(fdw_part) \ + (((FdwXactParticipant *)(fdw_part))->prepare_foreign_xact_fn != NULL) + +/* Directory where the foreign prepared transaction files will reside */ +#define FDWXACTS_DIR "pg_fdwxact" + +/* + * Name of foreign prepared transaction file is 8 bytes database oid, + * xid, foreign server oid and user oid separated by '_'. + * + * Since FdwXact stat file is created per foreign transaction in a + * distributed transaction and the xid of unresolved distributed + * transaction never reused, the name is fairly enough to ensure + * uniqueness. + */ +#define FDWXACT_FILE_NAME_LEN (8 + 1 + 8 + 1 + 8 + 1 + 8) +#define FdwXactFilePath(path, dbid, xid, serverid, userid) \ + snprintf(path, MAXPGPATH, FDWXACTS_DIR "/%08X_%08X_%08X_%08X", \ + dbid, xid, serverid, userid) /* * Structure to bundle the foreign transaction participant. This struct @@ -37,13 +126,23 @@ */ typedef struct FdwXactParticipant { + /* + * Pointer to a FdwXact entry in the global array. NULL if the entry is + * not inserted yet but this is registered as a participant. + */ + FdwXact fdwxact; + /* Foreign server and user mapping info, passed to callback routines */ ForeignServer *server; UserMapping *usermapping; + /* Transaction identifier used for PREPARE */ + char *fdwxact_id; + /* Callbacks for foreign transaction */ CommitForeignTransaction_function commit_foreign_xact_fn; RollbackForeignTransaction_function rollback_foreign_xact_fn; + PrepareForeignTransaction_function prepare_foreign_xact_fn; } FdwXactParticipant; /* @@ -52,11 +151,103 @@ typedef struct FdwXactParticipant */ static List *FdwXactParticipants = NIL; +/* Keep track of registering process exit call back. */ +static bool fdwXactExitRegistered = false; + +/* Guc parameter */ +int max_prepared_foreign_xacts = 0; + +static void AtProcExit_FdwXact(int code, Datum arg); +static void FdwXactPrepareForeignTransactions(TransactionId xid); static void ForgetAllFdwXactParticipants(void); static void FdwXactParticipantEndTransaction(FdwXactParticipant *fdw_part, bool commit); +static FdwXact FdwXactInsertEntry(TransactionId xid, + FdwXactParticipant *fdw_part); +static void FdwXactResolveFdwXacts(int *fdwxact_idxs, int nfdwxacts); +static void FdwXactComputeRequiredXmin(void); +static FdwXactStatus FdwXactGetTransactionFate(TransactionId xid); +static void FdwXactResolveOneFdwXact(FdwXact fdwxact); +static void FdwXactRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn); +static void FdwXactRedoRemove(Oid dbid, TransactionId xid, Oid serverid, + Oid userid, bool givewarning); +static void XlogReadFdwXactData(XLogRecPtr lsn, char **buf, int *len); +static char *ProcessFdwXactBuffer(Oid dbid, TransactionId xid, Oid serverid, + Oid userid, XLogRecPtr insert_start_lsn, + bool fromdisk); +static char *ReadFdwXactFile(Oid dbid, TransactionId xid, Oid serverid, Oid userid); +static void RemoveFdwXactFile(Oid dbid, TransactionId xid, Oid serverid, Oid userid, + bool giveWarning); +static FdwXact insert_fdwxact(Oid dbid, TransactionId xid, Oid serverid, Oid userid, + Oid umid, char *fdwxact_id); +static void remove_fdwxact(FdwXact fdwxact); static FdwXactParticipant *create_fdwxact_participant(Oid serverid, Oid userid, FdwRoutine *routine); +static char *get_fdwxact_identifier(FdwXactParticipant *fdw_part, + TransactionId xid); +static int get_fdwxact(Oid dbid, TransactionId xid, Oid serverid, Oid userid); + +/* + * Calculates the size of shared memory allocated for maintaining foreign + * prepared transaction entries. + */ +Size +FdwXactShmemSize(void) +{ + Size size; + + /* Size for foreign transaction information array */ + size = offsetof(FdwXactCtlData, fdwxacts); + size = add_size(size, mul_size(max_prepared_foreign_xacts, + sizeof(FdwXact))); + size = MAXALIGN(size); + size = add_size(size, mul_size(max_prepared_foreign_xacts, + sizeof(FdwXactData))); + + return size; +} + +/* + * Initialization of shared memory for maintaining foreign prepared transaction + * entries. The shared memory layout is defined in definition of FdwXactCtlData + * structure. + */ +void +FdwXactShmemInit(void) +{ + bool found; + + FdwXactCtl = ShmemInitStruct("Foreign transactions table", + FdwXactShmemSize(), + &found); + if (!IsUnderPostmaster) + { + FdwXact fdwxacts; + int cnt; + + Assert(!found); + FdwXactCtl->free_fdwxacts = NULL; + FdwXactCtl->num_fdwxacts = 0; + + /* Initialize the linked list of free FDW transactions */ + fdwxacts = (FdwXact) + ((char *) FdwXactCtl + + MAXALIGN(offsetof(FdwXactCtlData, fdwxacts) + + sizeof(FdwXact) * max_prepared_foreign_xacts)); + for (cnt = 0; cnt < max_prepared_foreign_xacts; cnt++) + { + fdwxacts[cnt].status = FDWXACT_STATUS_INVALID; + fdwxacts[cnt].fdwxact_free_next = FdwXactCtl->free_fdwxacts; + FdwXactCtl->free_fdwxacts = &fdwxacts[cnt]; + SpinLockInit(&fdwxacts[cnt].mutex); + } + } + else + { + Assert(FdwXactCtl); + Assert(found); + } +} /* * Register the given foreign transaction identified by the given arguments @@ -82,6 +273,13 @@ FdwXactRegisterXact(Oid serverid, Oid userid) } } + /* on first call, register the exit hook */ + if (!fdwXactExitRegistered) + { + before_shmem_exit(AtProcExit_FdwXact, 0); + fdwXactExitRegistered = true; + } + routine = GetFdwRoutineByServerId(serverid); /* @@ -142,14 +340,336 @@ create_fdwxact_participant(Oid serverid, Oid userid, FdwRoutine *routine) fdw_part = (FdwXactParticipant *) palloc(sizeof(FdwXactParticipant)); + fdw_part->fdwxact = NULL; fdw_part->server = foreign_server; fdw_part->usermapping = user_mapping; + fdw_part->fdwxact_id = NULL; fdw_part->commit_foreign_xact_fn = routine->CommitForeignTransaction; fdw_part->rollback_foreign_xact_fn = routine->RollbackForeignTransaction; + fdw_part->prepare_foreign_xact_fn = routine->PrepareForeignTransaction; return fdw_part; } +/* + * Insert FdwXact entries and prepare foreign transactions. + */ +static void +FdwXactPrepareForeignTransactions(TransactionId xid) +{ + ListCell *lc; + + Assert(FdwXactParticipants != NIL); + Assert(TransactionIdIsValid(xid)); + + /* Loop over the foreign connections */ + foreach(lc, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lc); + FdwXactRslvState state; + FdwXact fdwxact; + + Assert(ServerSupportTwophaseCommit(fdw_part)); + + CHECK_FOR_INTERRUPTS(); + + /* Get prepared transaction identifier */ + fdw_part->fdwxact_id = get_fdwxact_identifier(fdw_part, xid); + Assert(fdw_part->fdwxact_id); + + /* + * Insert the foreign transaction entry with the + * FDWXACT_STATUS_PREPARING status. Registration persists this + * information to the disk and logs (that way relaying it on standby). + * Thus in case we loose connectivity to the foreign server or crash + * ourselves, we will remember that we might have prepared transaction + * on the foreign server and try to resolve it when connectivity is + * restored or after crash recovery. + * + * If we prepare the transaction on the foreign server before + * persisting the information to the disk and crash in-between these + * two steps, we will lost the prepared transaction on the foreign + * server and will not be able to resolve it after the crash recovery. + * Hence persist first then prepare. + */ + fdwxact = FdwXactInsertEntry(xid, fdw_part); + + /* + * Prepare the foreign transaction. + * + * Between FdwXactInsertEntry call till this backend hears + * acknowledge from foreign server, the backend may abort the local + * transaction (say, because of a signal). + */ + state.server = fdw_part->server; + state.usermapping = fdw_part->usermapping; + state.fdwxact_id = fdw_part->fdwxact_id; + fdw_part->prepare_foreign_xact_fn(&state); + + /* succeeded, update status */ + SpinLockAcquire(&fdwxact->mutex); + fdwxact->status = FDWXACT_STATUS_PREPARED; + SpinLockRelease(&fdwxact->mutex); + } +} + +/* + * Return a null-terminated foreign transaction identifier. We generate an + * unique identifier with in the form of + * "fx____ whose length is + * less than FDWXACT_ID_MAX_LEN. + * + * Returned string value is used to identify foreign transaction. The + * identifier should not be same as any other concurrent prepared transaction + * identifier. + * + * To make the foreign transactionid unique, we should ideally use something + * like UUID, which gives unique ids with high probability, but that may be + * expensive here and UUID extension which provides the function to generate + * UUID is not part of the core code. + */ +static char * +get_fdwxact_identifier(FdwXactParticipant *fdw_part, TransactionId xid) +{ + char buf[FDWXACT_ID_MAX_LEN] = {0}; + + snprintf(buf, FDWXACT_ID_MAX_LEN, "fx_%ld_%u_%d_%d", + Abs(random()), xid, fdw_part->server->serverid, + fdw_part->usermapping->userid); + + return pstrdup(buf); +} + +/* + * This function is used to create new foreign transaction entry before an FDW + * prepares and commit/rollback. The function adds the entry to WAL and it will + * be persisted to the disk under pg_fdwxact directory when checkpoint. + */ +static FdwXact +FdwXactInsertEntry(TransactionId xid, FdwXactParticipant *fdw_part) +{ + FdwXact fdwxact; + FdwXactOnDiskData *fdwxact_file_data; + MemoryContext old_context; + int data_len; + + old_context = MemoryContextSwitchTo(TopTransactionContext); + + /* + * Enter the foreign transaction in the shared memory structure. + */ + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + fdwxact = insert_fdwxact(MyDatabaseId, xid, fdw_part->server->serverid, + fdw_part->usermapping->userid, + fdw_part->usermapping->umid, fdw_part->fdwxact_id); + fdwxact->locking_backend = MyBackendId; + LWLockRelease(FdwXactLock); + + fdw_part->fdwxact = fdwxact; + MemoryContextSwitchTo(old_context); + + /* + * Prepare to write the entry to a file. Also add xlog entry. The contents + * of the xlog record are same as what is written to the file. + */ + data_len = offsetof(FdwXactOnDiskData, fdwxact_id); + data_len = data_len + strlen(fdw_part->fdwxact_id) + 1; + data_len = MAXALIGN(data_len); + fdwxact_file_data = (FdwXactOnDiskData *) palloc0(data_len); + fdwxact_file_data->dbid = MyDatabaseId; + fdwxact_file_data->local_xid = xid; + fdwxact_file_data->serverid = fdw_part->server->serverid; + fdwxact_file_data->userid = fdw_part->usermapping->userid; + fdwxact_file_data->umid = fdw_part->usermapping->umid; + memcpy(fdwxact_file_data->fdwxact_id, fdw_part->fdwxact_id, + strlen(fdw_part->fdwxact_id) + 1); + + /* See note in RecordTransactionCommit */ + MyProc->delayChkpt = true; + + START_CRIT_SECTION(); + + /* Add the entry in the xlog and save LSN for checkpointer */ + XLogBeginInsert(); + XLogRegisterData((char *) fdwxact_file_data, data_len); + fdwxact->insert_end_lsn = XLogInsert(RM_FDWXACT_ID, XLOG_FDWXACT_INSERT); + XLogFlush(fdwxact->insert_end_lsn); + + /* If we crash now, we have prepared: WAL replay will fix things */ + + /* Store record's start location to read that later on CheckPoint */ + fdwxact->insert_start_lsn = ProcLastRecPtr; + + /* File is written completely, checkpoint can proceed with syncing */ + fdwxact->valid = true; + + /* Checkpoint can process now */ + MyProc->delayChkpt = false; + + END_CRIT_SECTION(); + + pfree(fdwxact_file_data); + return fdwxact; +} + +/* + * Insert a new entry for a given foreign transaction identified by transaction + * id, foreign server and user mapping, into the shared memory array. Caller + * must hold FdwXactLock in exclusive mode. + * + * If the entry already exists, the function raises an error. + */ +static FdwXact +insert_fdwxact(Oid dbid, TransactionId xid, Oid serverid, Oid userid, + Oid umid, char *fdwxact_id) +{ + FdwXact fdwxact; + + Assert(LWLockHeldByMeInMode(FdwXactLock, LW_EXCLUSIVE)); + + /* Check for duplicated foreign transaction entry */ + for (int i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + fdwxact = FdwXactCtl->fdwxacts[i]; + if (fdwxact->valid && + fdwxact->dbid == dbid && + fdwxact->local_xid == xid && + fdwxact->serverid == serverid && + fdwxact->userid == userid) + ereport(ERROR, (errmsg("could not insert a foreign transaction entry"), + errdetail("Duplicate entry with transaction id %u, serverid %u, userid %u exists.", + xid, serverid, userid))); + } + + /* + * Get a next free foreign transaction entry. Raise error if there are + * none left. + */ + if (!FdwXactCtl->free_fdwxacts) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("maximum number of foreign transactions reached"), + errhint("Increase max_prepared_foreign_transactions: \"%d\".", + max_prepared_foreign_xacts))); + } + fdwxact = FdwXactCtl->free_fdwxacts; + FdwXactCtl->free_fdwxacts = fdwxact->fdwxact_free_next; + + /* Insert the entry to shared memory array */ + Assert(FdwXactCtl->num_fdwxacts < max_prepared_foreign_xacts); + FdwXactCtl->fdwxacts[FdwXactCtl->num_fdwxacts++] = fdwxact; + + fdwxact->status = FDWXACT_STATUS_PREPARING; + fdwxact->local_xid = xid; + fdwxact->dbid = dbid; + fdwxact->serverid = serverid; + fdwxact->userid = userid; + fdwxact->umid = umid; + fdwxact->insert_start_lsn = InvalidXLogRecPtr; + fdwxact->insert_end_lsn = InvalidXLogRecPtr; + fdwxact->locking_backend = InvalidBackendId; + fdwxact->valid = false; + fdwxact->ondisk = false; + fdwxact->inredo = false; + memcpy(fdwxact->fdwxact_id, fdwxact_id, strlen(fdwxact_id) + 1); + + return fdwxact; +} + +/* + * Remove the foreign prepared transaction entry from shared memory. + * Caller must hold FdwXactLock in exclusive mode. + */ +static void +remove_fdwxact(FdwXact fdwxact) +{ + int i; + + Assert(fdwxact != NULL); + Assert(LWLockHeldByMeInMode(FdwXactLock, LW_EXCLUSIVE)); + + /* Search the slot where this entry resided */ + for (i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + if (FdwXactCtl->fdwxacts[i] == fdwxact) + break; + } + + /* We did not find the given entry in the array */ + if (i >= FdwXactCtl->num_fdwxacts) + ereport(ERROR, + (errmsg("could not remove a foreign transaction entry"), + errdetail("Failed to find entry for xid %u, foreign server %u, and user %u.", + fdwxact->local_xid, fdwxact->serverid, fdwxact->userid))); + + elog(DEBUG2, "remove fdwxact entry id %s, xid %u db %d user %d", + fdwxact->fdwxact_id, fdwxact->local_xid, fdwxact->dbid, + fdwxact->userid); + + /* Remove the entry from active array */ + FdwXactCtl->num_fdwxacts--; + FdwXactCtl->fdwxacts[i] = FdwXactCtl->fdwxacts[FdwXactCtl->num_fdwxacts]; + + /* Put it back into free list */ + fdwxact->fdwxact_free_next = FdwXactCtl->free_fdwxacts; + FdwXactCtl->free_fdwxacts = fdwxact; + + /* Reset informations */ + fdwxact->status = FDWXACT_STATUS_INVALID; + fdwxact->locking_backend = InvalidBackendId; + fdwxact->valid = false; + fdwxact->ondisk = false; + fdwxact->inredo = false; + + if (!RecoveryInProgress()) + { + xl_fdwxact_remove record; + XLogRecPtr recptr; + + /* Fill up the log record before releasing the entry */ + record.serverid = fdwxact->serverid; + record.dbid = fdwxact->dbid; + record.xid = fdwxact->local_xid; + record.userid = fdwxact->userid; + + /* + * Now writing FdwXact state data to WAL. We have to set delayChkpt + * here, otherwise a checkpoint starting immediately after the WAL + * record is inserted could complete without fsync'ing our state file. + * (This is essentially the same kind of race condition as the + * COMMIT-to-clog-write case that RecordTransactionCommit uses + * delayChkpt for; see notes there.) + */ + START_CRIT_SECTION(); + + MyProc->delayChkpt = true; + + /* + * Log that we are removing the foreign transaction entry and remove + * the file from the disk as well. + */ + XLogBeginInsert(); + XLogRegisterData((char *) &record, sizeof(xl_fdwxact_remove)); + recptr = XLogInsert(RM_FDWXACT_ID, XLOG_FDWXACT_REMOVE); + XLogFlush(recptr); + + /* Now we can mark ourselves as out of the commit critical section */ + MyProc->delayChkpt = false; + + END_CRIT_SECTION(); + } +} + +/* + * When the process exits, forget all the entries. + */ +static void +AtProcExit_FdwXact(int code, Datum arg) +{ + ForgetAllFdwXactParticipants(); +} + /* * The routine for committing or rolling back the given transaction participant. */ @@ -162,6 +682,7 @@ FdwXactParticipantEndTransaction(FdwXactParticipant *fdw_part, bool commit) state.server = fdw_part->server; state.usermapping = fdw_part->usermapping; + state.fdwxact_id = NULL; state.flags = FDWXACT_FLAG_ONEPHASE; if (commit) @@ -181,14 +702,46 @@ FdwXactParticipantEndTransaction(FdwXactParticipant *fdw_part, bool commit) } /* - * Clear the FdwXactParticipants list. + * Unlock foreign transaction participants and clear the FdwXactParticipants + * list. If we left foreign transaction, update the oldest xmin of unresolved + * transaction so that local transaction id of such unresolved foreign transaction + * is not truncated. */ static void ForgetAllFdwXactParticipants(void) { + ListCell *cell; + int nlefts = 0; + if (FdwXactParticipants == NIL) return; + foreach(cell, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(cell); + FdwXact fdwxact = fdw_part->fdwxact; + + /* Nothing to do if didn't register FdwXact entry yet */ + if (!fdwxact) + continue; + + /* Unlock the foreign transaction entry */ + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + fdwxact->locking_backend = InvalidBackendId; + LWLockRelease(FdwXactLock); + nlefts++; + } + + /* + * If we leave any FdwXact entries, update the oldest local transaction of + * unresolved distributed transaction. + */ + if (nlefts > 0) + { + elog(DEBUG1, "left %u foreign transactions", nlefts); + FdwXactComputeRequiredXmin(); + } + list_free_deep(FdwXactParticipants); FdwXactParticipants = NIL; } @@ -211,23 +764,1202 @@ AtEOXact_FdwXact(bool is_commit) foreach(lc, FdwXactParticipants) { FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lc); + FdwXact fdwxact = fdw_part->fdwxact; + + if (!fdwxact) + { + /* Commit or rollback the foreign transaction in one-phase */ + Assert(ServerSupportTransactionCallback(fdw_part)); + FdwXactParticipantEndTransaction(fdw_part, is_commit); + continue; + } + + /* + * This foreign transaction might have been prepared. In commit case, + * we don't need to do anything for this participant because all foreign + * transactions should have already been prepared and therefore the + * transaction already closed. These will be resolved manually. On the + * other hand in abort case, we need to close the transaction if + * preparing might be in-progress, since an error might have occurred + * on preparing a foreign transaction. + */ + if (!is_commit) + { + int status; - Assert(ServerSupportTransactionCallback(fdw_part)); - FdwXactParticipantEndTransaction(fdw_part, is_commit); + SpinLockAcquire(&(fdwxact->mutex)); + status = fdwxact->status; + fdwxact->status = FDWXACT_STATUS_ABORTING; + SpinLockRelease(&(fdwxact->mutex)); + + if (status == FDWXACT_STATUS_PREPARING) + FdwXactParticipantEndTransaction(fdw_part, false); + } } ForgetAllFdwXactParticipants(); } /* - * Check if the local transaction has any foreign transaction. + * Prepare foreign transactions by PREPARE TRANSACTION command. + * + * Note that it's possible that the transaction aborts after we prepared some + * of participants. In this case we change to rollback and rollback all foreign + * transactions. */ void PrePrepare_FdwXact(void) { - /* We don't support to prepare foreign transactions */ - if (FdwXactParticipants != NIL) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot PREPARE a transaction that has operated on foreign tables"))); + ListCell *lc; + TransactionId xid; + + if (FdwXactParticipants == NIL) + return; + + /* + * Check if there is a server that doesn't support two-phase commit. All + * involved servers need to support two-phase commit as we're going to + * prepare all of them. + */ + foreach(lc, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lc); + + if (!ServerSupportTwophaseCommit(fdw_part)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE a distributed transaction which has operated on a foreign server not supporting two-phase commit protocol"))); + } + + /* + * Assign a transaction id if not yet because the local transaction id + * is used to determine the result of the distributed transaction. And + * prepare all foreign transactions. + */ + xid = GetTopTransactionId(); + FdwXactPrepareForeignTransactions(xid); + + /* + * We keep FdwXactParticipants until the transaction end so that we change + * the involved foreign transactions to ABORTING in case of failure. + */ +} + +/* + * Resolve foreign transactions at the give indexes. + * + * The caller must hold the given foreign transactions in advance to prevent + * concurrent update. + */ +static void +FdwXactResolveFdwXacts(int *fdwxact_idxs, int nfdwxacts) +{ + for (int i = 0; i < nfdwxacts; i++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[fdwxact_idxs[i]]; + + CHECK_FOR_INTERRUPTS(); + + FdwXactResolveOneFdwXact(fdwxact); + + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + if (fdwxact->ondisk) + RemoveFdwXactFile(fdwxact->dbid, fdwxact->local_xid, fdwxact->serverid, + fdwxact->userid, true); + remove_fdwxact(fdwxact); + LWLockRelease(FdwXactLock); + } +} + +/* + * Return true if there is at least one prepared foreign transaction + * which matches given arguments. + */ +bool +FdwXactExists(Oid dbid, Oid serverid, Oid userid) +{ + int idx; + + LWLockAcquire(FdwXactLock, LW_SHARED); + idx = get_fdwxact(dbid, InvalidTransactionId, serverid, userid); + LWLockRelease(FdwXactLock); + + return (idx >= 0); +} + +/* + * Return the index of first found FdwXact entry that matched to given arguments. + * Otherwise return -1. The search condition is defined by arguments with valid + * values for respective datatypes. + */ +static int +get_fdwxact(Oid dbid, TransactionId xid, Oid serverid, Oid userid) +{ + bool found = false; + int i; + + Assert(LWLockHeldByMe(FdwXactLock)); + + for (i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[i]; + + if (!fdwxact->valid) + continue; + + /* dbid */ + if (OidIsValid(dbid) && fdwxact->dbid != dbid) + continue; + + /* xid */ + if (TransactionIdIsValid(xid) && xid != fdwxact->local_xid) + continue; + + /* serverid */ + if (OidIsValid(serverid) && serverid != fdwxact->serverid) + continue; + + /* userid */ + if (OidIsValid(userid) && fdwxact->userid != userid) + continue; + + /* This entry matches the condition */ + found = true; + break; + } + + return found ? i : -1; +} + +/* + * Compute the oldest xmin across all unresolved foreign transactions + * and store it in the ProcArray. + * + * XXX: we can exclude FdwXact entries whose status is already committing + * or aborting. + */ +static void +FdwXactComputeRequiredXmin(void) +{ + TransactionId agg_xmin = InvalidTransactionId; + + Assert(FdwXactCtl != NULL); + + LWLockAcquire(FdwXactLock, LW_SHARED); + + for (int i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[i]; + + if (!fdwxact->valid) + continue; + + Assert(TransactionIdIsValid(fdwxact->local_xid)); + + if (!TransactionIdIsValid(agg_xmin) || + TransactionIdPrecedes(fdwxact->local_xid, agg_xmin)) + agg_xmin = fdwxact->local_xid; + } + + LWLockRelease(FdwXactLock); + + ProcArraySetFdwXactUnresolvedXmin(agg_xmin); +} + + +/* + * Return whether the foreign transaction associated with the given transaction + * id should be committed or rolled back according to the result of the local + * transaction. + */ +static FdwXactStatus +FdwXactGetTransactionFate(TransactionId xid) +{ + /* + * If the local transaction is already committed, commit prepared foreign + * transaction. + */ + if (TransactionIdDidCommit(xid)) + return FDWXACT_STATUS_COMMITTING; + + /* + * If the local transaction is already aborted, abort prepared foreign + * transactions. + */ + if (TransactionIdDidAbort(xid)) + return FDWXACT_STATUS_ABORTING; + + /* + * The local transaction is not in progress but the foreign transaction is + * not prepared on the foreign server. This can happen when transaction + * failed after registered this entry but before actual preparing on the + * foreign server. So let's assume it aborted. + */ + if (!TransactionIdIsInProgress(xid)) + return FDWXACT_STATUS_ABORTING; + + /* + * The Local transaction is in progress and foreign transaction is about + * to be committed or aborted. Raise an error anyway since we cannot + * determine the fate of this foreign transaction according to the local + * transaction whose fate is also not determined. + */ + elog(ERROR, + "cannot resolve the foreign transaction associated with in-process transaction"); + + pg_unreachable(); +} + +/* Commit or rollback one prepared foreign transaction */ +static void +FdwXactResolveOneFdwXact(FdwXact fdwxact) +{ + FdwXactRslvState state; + ForeignServer *server; + ForeignDataWrapper *fdw; + FdwRoutine *routine; + + /* The FdwXact entry must be held by me */ + Assert(fdwxact != NULL); + Assert(fdwxact->locking_backend == MyBackendId); + + if (fdwxact->status != FDWXACT_STATUS_COMMITTING && + fdwxact->status != FDWXACT_STATUS_ABORTING) + { + FdwXactStatus new_status; + + new_status = FdwXactGetTransactionFate(fdwxact->local_xid); + Assert(new_status == FDWXACT_STATUS_COMMITTING || + new_status == FDWXACT_STATUS_ABORTING); + + /* Update the status */ + SpinLockAcquire(&fdwxact->mutex); + fdwxact->status = new_status; + SpinLockRelease(&fdwxact->mutex); + } + + server = GetForeignServer(fdwxact->serverid); + fdw = GetForeignDataWrapper(server->fdwid); + routine = GetFdwRoutine(fdw->fdwhandler); + + /* Prepare the resolution state to pass to API */ + state.server = server; + state.usermapping = GetUserMapping(fdwxact->userid, fdwxact->serverid); + state.fdwxact_id = fdwxact->fdwxact_id; + state.flags = 0; + + if (fdwxact->status == FDWXACT_STATUS_COMMITTING) + { + routine->CommitForeignTransaction(&state); + elog(DEBUG1, "successfully committed the prepared foreign transaction for server %u user %u", + fdwxact->serverid, fdwxact->userid); + } + else + { + routine->RollbackForeignTransaction(&state); + elog(DEBUG1, "successfully rolled back the prepared foreign transaction for server %u user %u", + fdwxact->serverid, fdwxact->userid); + } +} + +/* Apply the redo log for a foreign transaction */ +void +fdwxact_redo(XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_FDWXACT_INSERT) + { + /* + * Add fdwxact entry and set start/end lsn of the WAL record in + * FdwXact entry. + */ + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + FdwXactRedoAdd(XLogRecGetData(record), + record->ReadRecPtr, + record->EndRecPtr); + LWLockRelease(FdwXactLock); + } + else if (info == XLOG_FDWXACT_REMOVE) + { + xl_fdwxact_remove *record = (xl_fdwxact_remove *) rec; + + /* Delete FdwXact entry and file if exists */ + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + FdwXactRedoRemove(record->dbid, record->xid, record->serverid, + record->userid, false); + LWLockRelease(FdwXactLock); + } + else + elog(ERROR, "invalid log type %d in foreign transaction log record", info); + + return; +} + + +/* + * Store pointer to the start/end of the WAL record along with the xid in + * a fdwxact entry in shared memory FdwXactData structure. + */ +static void +FdwXactRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) +{ + FdwXactOnDiskData *fdwxact_data = (FdwXactOnDiskData *) buf; + FdwXact fdwxact; + + Assert(LWLockHeldByMeInMode(FdwXactLock, LW_EXCLUSIVE)); + Assert(RecoveryInProgress()); + + /* + * Add this entry into the table of foreign transactions. The status of + * the transaction is set as preparing, since we do not know the exact + * status right now. Resolver will set it later based on the status of + * local transaction which prepared this foreign transaction. + */ + fdwxact = insert_fdwxact(fdwxact_data->dbid, fdwxact_data->local_xid, + fdwxact_data->serverid, fdwxact_data->userid, + fdwxact_data->umid, fdwxact_data->fdwxact_id); + + elog(DEBUG2, "added fdwxact entry in shared memory for foreign transaction, db %u xid %u server %u user %u id %s", + fdwxact_data->dbid, fdwxact_data->local_xid, + fdwxact_data->serverid, fdwxact_data->userid, + fdwxact_data->fdwxact_id); + + /* + * Set status as PREPARED, since we do not know the xact status right now. + * We will set it later based on the status of local transaction that + * prepared this fdwxact entry. + */ + fdwxact->status = FDWXACT_STATUS_PREPARED; + fdwxact->insert_start_lsn = start_lsn; + fdwxact->insert_end_lsn = end_lsn; + fdwxact->inredo = true; /* added in redo */ + fdwxact->valid = false; + fdwxact->ondisk = XLogRecPtrIsInvalid(start_lsn); +} + +/* + * Remove the corresponding fdwxact entry from FdwXactCtl. Also remove + * FdwXact file if a foreign transaction was saved via an earlier checkpoint. + * We could not found the FdwXact entry in the case where a crash recovery + * starts from the point where is after added but before removed the entry. + */ +static void +FdwXactRedoRemove(Oid dbid, TransactionId xid, Oid serverid, + Oid userid, bool givewarning) +{ + FdwXact fdwxact; + int i; + + Assert(LWLockHeldByMeInMode(FdwXactLock, LW_EXCLUSIVE)); + Assert(RecoveryInProgress()); + + for (i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + fdwxact = FdwXactCtl->fdwxacts[i]; + + if (fdwxact->dbid == dbid && fdwxact->local_xid == xid && + fdwxact->serverid == serverid && fdwxact->userid == userid) + break; + } + + if (i >= FdwXactCtl->num_fdwxacts) + return; + + /* Clean up entry and any files we may have left */ + if (fdwxact->ondisk) + RemoveFdwXactFile(fdwxact->dbid, fdwxact->local_xid, + fdwxact->serverid, fdwxact->userid, + givewarning); + remove_fdwxact(fdwxact); + + elog(DEBUG2, "removed fdwxact entry from shared memory for foreign transaction, db %u xid %u server %u user %u id %s", + fdwxact->dbid, fdwxact->local_xid, fdwxact->serverid, + fdwxact->userid, fdwxact->fdwxact_id); +} + +/* + * We must fsync the foreign transaction state file that is valid or generated + * during redo and has a inserted LSN <= the checkpoint's redo horizon. + * The foreign transaction entries and hence the corresponding files are expected + * to be very short-lived. By executing this function at the end, we might have + * lesser files to fsync, thus reducing some I/O. This is similar to + * CheckPointTwoPhase(). + * + * This is deliberately run as late as possible in the checkpoint sequence, + * because FdwXacts ordinarily have short lifespans, and so it is quite + * possible that FdwXacts that were valid at checkpoint start will no longer + * exist if we wait a little bit. With typical checkpoint settings this + * will be about 3 minutes for an online checkpoint, so as a result we + * expect that there will be no FdwXacts that need to be copied to disk. + * + * If a FdwXact remains valid across multiple checkpoints, it will already + * be on disk so we don't bother to repeat that write. + */ +void +CheckPointFdwXacts(XLogRecPtr redo_horizon) +{ + int cnt; + int serialized_fdwxacts = 0; + + if (max_prepared_foreign_xacts == 0) + return; /* nothing to do */ + + /* + * We are expecting there to be zero FdwXact that need to be copied to + * disk, so we perform all I/O while holding FdwXactLock for simplicity. + * This presents any new foreign xacts from preparing while this occurs, + * which shouldn't be a problem since the presence of long-lived prepared + * foreign xacts indicated the transaction manager isn't active. + * + * It's also possible to move I/O out of the lock, but on every error we + * should check whether somebody committed our transaction in different + * backend. Let's leave this optimisation for future, if somebody will + * spot that this place cause bottleneck. + * + * Note that it isn't possible for there to be a FdwXact with a + * insert_end_lsn set prior to the last checkpoint yet is marked invalid, + * because of the efforts with delayChkpt. + */ + LWLockAcquire(FdwXactLock, LW_SHARED); + for (cnt = 0; cnt < FdwXactCtl->num_fdwxacts; cnt++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[cnt]; + + if ((fdwxact->valid || fdwxact->inredo) && + !fdwxact->ondisk && + fdwxact->insert_end_lsn <= redo_horizon) + { + char *buf; + int len; + + XlogReadFdwXactData(fdwxact->insert_start_lsn, &buf, &len); + RecreateFdwXactFile(fdwxact->dbid, fdwxact->local_xid, + fdwxact->serverid, fdwxact->userid, + buf, len); + fdwxact->ondisk = true; + fdwxact->insert_start_lsn = InvalidXLogRecPtr; + fdwxact->insert_end_lsn = InvalidXLogRecPtr; + pfree(buf); + serialized_fdwxacts++; + } + } + + LWLockRelease(FdwXactLock); + + /* + * Flush unconditionally the parent directory to make any information + * durable on disk. FdwXact files could have been removed and those + * removals need to be made persistent as well as any files newly created. + */ + fsync_fname(FDWXACTS_DIR, true); + + if (log_checkpoints && serialized_fdwxacts > 0) + ereport(LOG, + (errmsg_plural("%u foreign transaction state file was written " + "for long-running prepared transactions", + "%u foreign transaction state files were written " + "for long-running prepared transactions", + serialized_fdwxacts, + serialized_fdwxacts))); +} + +/* + * Reads foreign transaction data from xlog. During checkpoint this data will + * be moved to fdwxact files and ReadFdwXactFile should be used instead. + * + * Note clearly that this function accesses WAL during normal operation, similarly + * to the way WALSender or Logical Decoding would do. It does not run during + * crash recovery or standby processing. + */ +static void +XlogReadFdwXactData(XLogRecPtr lsn, char **buf, int *len) +{ + XLogRecord *record; + XLogReaderState *xlogreader; + char *errormsg; + + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &read_local_xlog_page, + .segment_open = &wal_segment_open, + .segment_close = &wal_segment_close), + NULL); + if (!xlogreader) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating an XLog reading processor."))); + + XLogBeginRead(xlogreader, lsn); + record = XLogReadRecord(xlogreader, &errormsg); + if (record == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read foreign transaction state from xlog at %X/%X", + (uint32) (lsn >> 32), + (uint32) lsn))); + + if (XLogRecGetRmid(xlogreader) != RM_FDWXACT_ID || + (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_FDWXACT_INSERT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("expected foreign transaction state data is not present in xlog at %X/%X", + (uint32) (lsn >> 32), + (uint32) lsn))); + + if (len != NULL) + *len = XLogRecGetDataLen(xlogreader); + + *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader)); + memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader)); + + XLogReaderFree(xlogreader); +} + +/* + * Recreates a foreign transaction state file. This is used in WAL replay + * and during checkpoint creation. + * + * Note: content and len don't include CRC. + */ +void +RecreateFdwXactFile(Oid dbid, TransactionId xid, Oid serverid, + Oid userid, void *content, int len) +{ + char path[MAXPGPATH]; + pg_crc32c statefile_crc; + int fd; + + /* Recompute CRC */ + INIT_CRC32C(statefile_crc); + COMP_CRC32C(statefile_crc, content, len); + FIN_CRC32C(statefile_crc); + + FdwXactFilePath(path, dbid, xid, serverid, userid); + + fd = OpenTransientFile(path, O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY); + + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not recreate foreign transaction state file \"%s\": %m", + path))); + + /* Write content and CRC */ + pgstat_report_wait_start(WAIT_EVENT_FDWXACT_FILE_WRITE); + if (write(fd, content, len) != len) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write foreign transaction state file: %m"))); + } + if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c)) + { + if (errno == 0) + errno = ENOSPC; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write foreign transaction state file: %m"))); + } + pgstat_report_wait_end(); + + /* + * We must fsync the file because the end-of-replay checkpoint will not do + * so, there being no FDWXACT in shared memory yet to tell it to. + */ + pgstat_report_wait_start(WAIT_EVENT_FDWXACT_FILE_SYNC); + if (pg_fsync(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync foreign transaction state file: %m"))); + pgstat_report_wait_end(); + + if (CloseTransientFile(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close foreign transaction file: %m"))); +} + +/* + * Given a transaction id, userid and serverid read it either from disk + * or read it directly via shmem xlog record pointer using the provided + * "insert_start_lsn". + */ +static char * +ProcessFdwXactBuffer(Oid dbid, TransactionId xid, Oid serverid, + Oid userid, XLogRecPtr insert_start_lsn, bool fromdisk) +{ + TransactionId origNextXid = + XidFromFullTransactionId(ShmemVariableCache->nextXid); + char *buf; + + Assert(LWLockHeldByMeInMode(FdwXactLock, LW_EXCLUSIVE)); + + if (!fromdisk) + Assert(!XLogRecPtrIsInvalid(insert_start_lsn)); + + /* Reject XID if too new */ + if (TransactionIdFollowsOrEquals(xid, origNextXid)) + { + if (fromdisk) + { + ereport(WARNING, + (errmsg("removing future fdwxact state file for xid %u, server %u and user %u", + xid, serverid, userid))); + RemoveFdwXactFile(dbid, xid, serverid, userid, true); + } + else + { + ereport(WARNING, + (errmsg("removing future fdwxact state from memory for xid %u, server %u and user %u", + xid, serverid, userid))); + FdwXactRedoRemove(dbid, xid, serverid, userid, true); + } + return NULL; + } + + if (fromdisk) + { + /* Read and validate file */ + buf = ReadFdwXactFile(dbid, xid, serverid, userid); + } + else + { + /* Read xlog data */ + XlogReadFdwXactData(insert_start_lsn, &buf, NULL); + } + + return buf; +} + +/* + * Read and validate the foreign transaction state file. + * + * If it looks OK (has a valid magic number and CRC), return the palloc'd + * contents of the file, issuing an error when finding corrupted data. + * This state can be reached when doing recovery. + */ +static char * +ReadFdwXactFile(Oid dbid, TransactionId xid, Oid serverid, Oid userid) +{ + char path[MAXPGPATH]; + int fd; + FdwXactOnDiskData *fdwxact_file_data; + struct stat stat; + uint32 crc_offset; + pg_crc32c calc_crc; + pg_crc32c file_crc; + char *buf; + int r; + + FdwXactFilePath(path, dbid, xid, serverid, userid); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open FDW transaction state file \"%s\": %m", + path))); + + /* + * Check file length. We can determine a lower bound pretty easily. We + * set an upper bound to avoid palloc() failure on a corrupt file, though + * we can't guarantee that we won't get an out of memory error anyway, + * even on a valid file. + */ + if (fstat(fd, &stat)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat FDW transaction state file \"%s\": %m", + path))); + + if (stat.st_size < (offsetof(FdwXactOnDiskData, fdwxact_id) + + sizeof(pg_crc32c)) || + stat.st_size > MaxAllocSize) + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("too large FDW transaction state file \"%s\": %m", + path))); + + crc_offset = stat.st_size - sizeof(pg_crc32c); + if (crc_offset != MAXALIGN(crc_offset)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("incorrect alignment of CRC offset for file \"%s\"", + path))); + + /* + * Ok, slurp in the file. + */ + buf = (char *) palloc(stat.st_size); + fdwxact_file_data = (FdwXactOnDiskData *) buf; + + /* Slurp the file */ + pgstat_report_wait_start(WAIT_EVENT_FDWXACT_FILE_READ); + r = read(fd, buf, stat.st_size); + if (r != stat.st_size) + { + if (r < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", path))); + else + ereport(ERROR, + (errmsg("could not read file \"%s\": read %d of %zu", + path, r, (Size) stat.st_size))); + } + pgstat_report_wait_end(); + + if (CloseTransientFile(fd)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", path))); + + /* + * Check the CRC. + */ + INIT_CRC32C(calc_crc); + COMP_CRC32C(calc_crc, buf, crc_offset); + FIN_CRC32C(calc_crc); + + file_crc = *((pg_crc32c *) (buf + crc_offset)); + + if (!EQ_CRC32C(calc_crc, file_crc)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("calculated CRC checksum does not match value stored in file \"%s\"", + path))); + + /* Check if the contents is an expected data */ + fdwxact_file_data = (FdwXactOnDiskData *) buf; + if (fdwxact_file_data->dbid != dbid || + fdwxact_file_data->serverid != serverid || + fdwxact_file_data->userid != userid || + fdwxact_file_data->local_xid != xid) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid foreign transaction state file \"%s\"", + path))); + + return buf; +} + +/* + * Scan the shared memory entries of FdwXact and determine the range of valid + * XIDs present. This is run during database startup, after we have completed + * reading WAL. ShmemVariableCache->nextXid has been set to one more than + * the highest XID for which evidence exists in WAL. + + * On corrupted two-phase files, fail immediately. Keeping around broken + * entries and let replay continue causes harm on the system, and a new + * backup should be rolled in. + + * Our other responsibility is to update and return the oldest valid XID + * among the distributed transactions. This is needed to synchronize pg_subtrans + * startup properly. + */ +TransactionId +PrescanFdwXacts(TransactionId oldestActiveXid) +{ + FullTransactionId nextXid = ShmemVariableCache->nextXid; + TransactionId origNextXid = XidFromFullTransactionId(nextXid); + TransactionId result = origNextXid; + + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + for (int i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[i]; + char *buf; + + buf = ProcessFdwXactBuffer(fdwxact->dbid, fdwxact->local_xid, + fdwxact->serverid, fdwxact->userid, + fdwxact->insert_start_lsn, fdwxact->ondisk); + + if (buf == NULL) + continue; + + if (TransactionIdPrecedes(fdwxact->local_xid, result)) + result = fdwxact->local_xid; + + pfree(buf); + } + LWLockRelease(FdwXactLock); + + return result; +} + +/* + * Scan pg_fdwxact and fill FdwXact depending on the on-disk data. + * This is called once at the beginning of recovery, saving any extra + * lookups in the future. FdwXact files that are newer than the + * minimum XID horizon are discarded on the way. + */ +void +RestoreFdwXactData(void) +{ + DIR *cldir; + struct dirent *clde; + + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + cldir = AllocateDir(FDWXACTS_DIR); + while ((clde = ReadDir(cldir, FDWXACTS_DIR)) != NULL) + { + if (strlen(clde->d_name) == FDWXACT_FILE_NAME_LEN && + strspn(clde->d_name, "0123456789ABCDEF_") == FDWXACT_FILE_NAME_LEN) + { + TransactionId local_xid; + Oid dbid; + Oid serverid; + Oid userid; + char *buf; + + sscanf(clde->d_name, "%08x_%08x_%08x_%08x", + &dbid, &local_xid, &serverid, &userid); + + /* Read fdwxact data from disk */ + buf = ProcessFdwXactBuffer(dbid, local_xid, serverid, userid, + InvalidXLogRecPtr, true); + + if (buf == NULL) + continue; + + /* Add this entry into the table of foreign transactions */ + FdwXactRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr); + } + } + + LWLockRelease(FdwXactLock); + FreeDir(cldir); +} + +/* + * Remove the foreign transaction file for given entry. + * + * If giveWarning is false, do not complain about file-not-present; + * this is an expected case during WAL replay. + */ +static void +RemoveFdwXactFile(Oid dbid, TransactionId xid, Oid serverid, Oid userid, + bool giveWarning) +{ + char path[MAXPGPATH]; + + FdwXactFilePath(path, dbid, xid, serverid, userid); + if (unlink(path) < 0 && (errno != ENOENT || giveWarning)) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove foreign transaction state file \"%s\": %m", + path))); +} + +/* + * Scan the shared memory entries of FdwXact and valid them. + * + * This is run at the end of recovery, but before we allow backends to write + * WAL. + */ +void +RecoverFdwXacts(void) +{ + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + for (int i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[i]; + char *buf; + + buf = ProcessFdwXactBuffer(fdwxact->dbid, fdwxact->local_xid, + fdwxact->serverid, fdwxact->userid, + fdwxact->insert_start_lsn, fdwxact->ondisk); + + if (buf == NULL) + continue; + + ereport(LOG, + (errmsg("recovering foreign prepared transaction %u for server %u and user %u from shared memory", + fdwxact->local_xid, fdwxact->serverid, fdwxact->userid))); + + /* recovered, so reset the flag for entries generated by redo */ + fdwxact->inredo = false; + fdwxact->valid = true; + pfree(buf); + } + LWLockRelease(FdwXactLock); +} + +/* Built in functions */ + +/* + * Structure to hold and iterate over the foreign transactions to be displayed + * by the built-in functions. + */ +typedef struct +{ + FdwXact fdwxacts; + int num_xacts; + int cur_xact; +} WorkingStatus; + +Datum +pg_foreign_xacts(PG_FUNCTION_ARGS) +{ +#define PG_PREPARED_FDWXACTS_COLS 6 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + LWLockAcquire(FdwXactLock, LW_SHARED); + for (int i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[i]; + FdwXactStatus status; + char *xact_status; + Datum values[PG_PREPARED_FDWXACTS_COLS]; + bool nulls[PG_PREPARED_FDWXACTS_COLS]; + + if (!fdwxact->valid) + continue; + + memset(nulls, 0, sizeof(nulls)); + + SpinLockAcquire(&fdwxact->mutex); + status = fdwxact->status; + SpinLockRelease(&fdwxact->mutex); + + values[0] = TransactionIdGetDatum(fdwxact->local_xid); + values[1] = ObjectIdGetDatum(fdwxact->serverid); + values[2] = ObjectIdGetDatum(fdwxact->userid); + + switch (status) + { + case FDWXACT_STATUS_PREPARING: + xact_status = "preparing"; + break; + case FDWXACT_STATUS_PREPARED: + xact_status = "prepared"; + break; + case FDWXACT_STATUS_COMMITTING: + xact_status = "committing"; + break; + case FDWXACT_STATUS_ABORTING: + xact_status = "aborting"; + break; + default: + xact_status = "unknown"; + break; + } + + values[3] = CStringGetTextDatum(xact_status); + values[4] = CStringGetTextDatum(fdwxact->fdwxact_id); + + if (fdwxact->locking_backend != InvalidBackendId) + { + PGPROC *locker = BackendIdGetProc(fdwxact->locking_backend); + values[5] = Int32GetDatum(locker->pid); + } + else + nulls[5] = true; + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + LWLockRelease(FdwXactLock); + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} + +/* + * Built-in SQL function to resolve a prepared foreign transaction. + */ +Datum +pg_resolve_foreign_xact(PG_FUNCTION_ARGS) +{ + TransactionId xid = DatumGetTransactionId(PG_GETARG_DATUM(0)); + Oid serverid = PG_GETARG_OID(1); + Oid userid = PG_GETARG_OID(2); + Oid myuserid; + FdwXact fdwxact; + int idx; + + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + + idx = get_fdwxact(MyDatabaseId, xid, serverid, userid); + + if (idx < 0) + { + /* not found */ + LWLockRelease(FdwXactLock); + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("does not exist foreign transaction"))); + } + + fdwxact = FdwXactCtl->fdwxacts[idx]; + + myuserid = GetUserId(); + if (myuserid != fdwxact->userid && !superuser_arg(myuserid)) + ereport(ERROR, + (errmsg("permission denied to resolve prepared foreign transaction"), + errhint("Must be superuser or the user that prepared the transaction"))); + + if (fdwxact->locking_backend != InvalidBackendId) + { + /* the entry is being processed by someone */ + LWLockRelease(FdwXactLock); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("foreign transaction with transaction identifier \"%s\" is busy", + fdwxact->fdwxact_id))); + } + + if (TwoPhaseExists(fdwxact->local_xid)) + { + /* + * the entry's local transaction is prepared. Since we cannot know the + * fate of the local transaction, we cannot resolve this foreign + * transaction. + */ + LWLockRelease(FdwXactLock); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot resolve foreign transaction with identifier \"%s\" whose local transaction is in-progress", + fdwxact->fdwxact_id), + errhint("Do COMMIT PREPARED or ROLLBACK PREPARED"))); + } + + /* Hold the entry */ + fdwxact->locking_backend = MyBackendId; + + LWLockRelease(FdwXactLock); + + PG_TRY(); + { + FdwXactResolveFdwXacts(&idx, 1); + } + PG_CATCH(); + { + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + FdwXactCtl->fdwxacts[idx]->locking_backend = InvalidBackendId; + LWLockRelease(FdwXactLock); + + PG_RE_THROW(); + } + PG_END_TRY(); + + PG_RETURN_BOOL(true); +} + +/* + * Built-in function to remove a prepared foreign transaction entry without + * resolution. The function gives a way to forget about such prepared + * transaction in case: the foreign server where it is prepared is no longer + * available, the user which prepared this transaction needs to be dropped. + */ +Datum +pg_remove_foreign_xact(PG_FUNCTION_ARGS) +{ + TransactionId xid = DatumGetTransactionId(PG_GETARG_DATUM(0)); + Oid serverid = PG_GETARG_OID(1); + Oid userid = PG_GETARG_OID(2); + Oid myuserid; + FdwXact fdwxact; + int idx; + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser to remove foreign transactions")))); + + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + + idx = get_fdwxact(MyDatabaseId, xid, serverid, userid); + + if (idx < 0) + { + /* not found */ + LWLockRelease(FdwXactLock); + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("does not exist foreign transaction on server %u", + serverid))); + } + + fdwxact = FdwXactCtl->fdwxacts[idx]; + + myuserid = GetUserId(); + if (myuserid != fdwxact->userid && !superuser_arg(myuserid)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("permission denied to remove prepared foreign transaction"), + errhint("Must be superuser or the user that prepared the transaction")))); + + if (fdwxact->locking_backend != InvalidBackendId) + { + /* the entry is being held by someone */ + LWLockRelease(FdwXactLock); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("foreign transaction with transaction id %u, server %u, and user %u is busy", + xid, serverid, userid))); + } + + /* Hold the entry */ + fdwxact->locking_backend = MyBackendId; + + PG_TRY(); + { + /* Clean up entry and any files we may have left */ + if (fdwxact->ondisk) + RemoveFdwXactFile(fdwxact->dbid, fdwxact->local_xid, + fdwxact->serverid, fdwxact->userid, + true); + remove_fdwxact(fdwxact); + } + PG_CATCH(); + { + if (fdwxact->valid) + { + Assert(fdwxact->locking_backend == MyBackendId); + fdwxact->locking_backend = InvalidBackendId; + } + LWLockRelease(FdwXactLock); + PG_RE_THROW(); + } + PG_END_TRY(); + + LWLockRelease(FdwXactLock); + + PG_RETURN_BOOL(true); } diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index f88d72fd86..982c1a36cc 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -13,6 +13,7 @@ OBJS = \ clogdesc.o \ committsdesc.o \ dbasedesc.o \ + fdwxactdesc.o \ genericdesc.o \ gindesc.o \ gistdesc.o \ diff --git a/src/backend/access/rmgrdesc/fdwxactdesc.c b/src/backend/access/rmgrdesc/fdwxactdesc.c new file mode 100644 index 0000000000..ca761763e5 --- /dev/null +++ b/src/backend/access/rmgrdesc/fdwxactdesc.c @@ -0,0 +1,58 @@ +/*------------------------------------------------------------------------- + * + * fdwxactdesc.c + * PostgreSQL global transaction manager for foreign server. + * + * This module describes the WAL records for foreign transaction manager. + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * + * src/backend/access/rmgrdesc/fdwxactdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/fdwxact_xlog.h" + +void +fdwxact_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_FDWXACT_INSERT) + { + FdwXactOnDiskData *fdwxact_insert = (FdwXactOnDiskData *) rec; + + appendStringInfo(buf, "server: %u,", fdwxact_insert->serverid); + appendStringInfo(buf, " user: %u,", fdwxact_insert->userid); + appendStringInfo(buf, " database: %u,", fdwxact_insert->dbid); + appendStringInfo(buf, " local xid: %u,", fdwxact_insert->local_xid); + appendStringInfo(buf, " id: %s", fdwxact_insert->fdwxact_id); + } + else + { + xl_fdwxact_remove *fdwxact_remove = (xl_fdwxact_remove *) rec; + + appendStringInfo(buf, "server: %u,", fdwxact_remove->serverid); + appendStringInfo(buf, " user: %u,", fdwxact_remove->userid); + appendStringInfo(buf, " database: %u,", fdwxact_remove->dbid); + appendStringInfo(buf, " local xid: %u", fdwxact_remove->xid); + } + +} + +const char * +fdwxact_identify(uint8 info) +{ + switch (info & ~XLR_INFO_MASK) + { + case XLOG_FDWXACT_INSERT: + return "NEW FOREIGN TRANSACTION"; + case XLOG_FDWXACT_REMOVE: + return "REMOVE FOREIGN TRANSACTION"; + } + /* Keep compiler happy */ + return NULL; +} diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index 92cc7ea073..e4ae79e599 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -114,7 +114,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "max_connections=%d max_worker_processes=%d " "max_wal_senders=%d max_prepared_xacts=%d " "max_locks_per_xact=%d wal_level=%s " - "wal_log_hints=%s track_commit_timestamp=%s", + "wal_log_hints=%s track_commit_timestamp=%s " + "max_prepared_foreign_transactions=%d", xlrec.MaxConnections, xlrec.max_worker_processes, xlrec.max_wal_senders, @@ -122,7 +123,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record) xlrec.max_locks_per_xact, wal_level_str, xlrec.wal_log_hints ? "on" : "off", - xlrec.track_commit_timestamp ? "on" : "off"); + xlrec.track_commit_timestamp ? "on" : "off", + xlrec.max_prepared_foreign_xacts); } else if (info == XLOG_FPW_CHANGE) { diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 58091f6b52..0a3f4b383f 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -10,6 +10,7 @@ #include "access/brin_xlog.h" #include "access/clog.h" #include "access/commit_ts.h" +#include "access/fdwxact_xlog.h" #include "access/generic_xlog.h" #include "access/ginxlog.h" #include "access/gistxlog.h" diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index fc18b77832..5c8a55358d 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -845,6 +845,34 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held) return result; } +/* + * TwoPhaseExists + * Return true if there is a prepared transaction specified by XID + */ +bool +TwoPhaseExists(TransactionId xid) +{ + int i; + bool found = false; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + if (gxact->xid == xid) + { + found = true; + break; + } + } + + LWLockRelease(TwoPhaseStateLock); + + return found; +} + /* * TwoPhaseGetDummyBackendId * Get the dummy backend ID for prepared transaction specified by XID diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b8990af8b6..a87f6b5abf 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2568,6 +2568,7 @@ PrepareTransaction(void) PostPrepare_Twophase(); /* PREPARE acts the same as COMMIT as far as GUC is concerned */ + AtEOXact_FdwXact(true); AtEOXact_GUC(true, 1); AtEOXact_SPI(true); AtEOXact_Enum(); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index b18257c198..17773d38da 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -24,6 +24,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/fdwxact.h" #include "access/heaptoast.h" #include "access/multixact.h" #include "access/rewriteheap.h" @@ -4629,6 +4630,7 @@ InitControlFile(uint64 sysidentifier) ControlFile->max_worker_processes = max_worker_processes; ControlFile->max_wal_senders = max_wal_senders; ControlFile->max_prepared_xacts = max_prepared_xacts; + ControlFile->max_prepared_foreign_xacts = max_prepared_foreign_xacts; ControlFile->max_locks_per_xact = max_locks_per_xact; ControlFile->wal_level = wal_level; ControlFile->wal_log_hints = wal_log_hints; @@ -6314,6 +6316,9 @@ CheckRequiredParameterValues(void) RecoveryRequiresIntParameter("max_prepared_transactions", max_prepared_xacts, ControlFile->max_prepared_xacts); + RecoveryRequiresIntParameter("max_prepared_foreign_transactions", + max_prepared_foreign_xacts, + ControlFile->max_prepared_foreign_xacts); RecoveryRequiresIntParameter("max_locks_per_transaction", max_locks_per_xact, ControlFile->max_locks_per_xact); @@ -6861,14 +6866,15 @@ StartupXLOG(void) restoreTimeLineHistoryFiles(ThisTimeLineID, recoveryTargetTLI); /* - * Before running in recovery, scan pg_twophase and fill in its status to - * be able to work on entries generated by redo. Doing a scan before - * taking any recovery action has the merit to discard any 2PC files that - * are newer than the first record to replay, saving from any conflicts at - * replay. This avoids as well any subsequent scans when doing recovery - * of the on-disk two-phase data. + * Before running in recovery, scan pg_twophase and pg_fdwxacts, and then + * fill in its status to be able to work on entries generated by redo. + * Doing a scan before taking any recovery action has the merit to discard + * any state files that are newer than the first record to replay, saving + * from any conflicts at replay. This avoids as well any subsequent scans + * when doing recovery of the on-disk two-phase or fdwxact data. */ restoreTwoPhaseData(); + RestoreFdwXactData(); lastFullPageWrites = checkPoint.fullPageWrites; @@ -7070,7 +7076,10 @@ StartupXLOG(void) InitRecoveryTransactionEnvironment(); if (wasShutdown) + { oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids); + oldestActiveXID = PrescanFdwXacts(oldestActiveXID); + } else oldestActiveXID = checkPoint.oldestActiveXid; Assert(TransactionIdIsValid(oldestActiveXID)); @@ -7582,11 +7591,13 @@ StartupXLOG(void) } /* - * Pre-scan prepared transactions to find out the range of XIDs present. - * This information is not quite needed yet, but it is positioned here so - * as potential problems are detected before any on-disk change is done. + * Pre-scan prepared transactions and foreign prepared transacftions to find + * out the range of XIDs present. This information is not quite needed yet, + * but it is positioned here so as potential problems are detected before any + * on-disk change is done. */ oldestActiveXID = PrescanPreparedTransactions(NULL, NULL); + oldestActiveXID = PrescanFdwXacts(oldestActiveXID); /* * Consider whether we need to assign a new timeline ID. @@ -7914,8 +7925,12 @@ StartupXLOG(void) TrimCLOG(); TrimMultiXact(); - /* Reload shared-memory state for prepared transactions */ + /* + * Reload shared-memory state for prepared transactions and foreign + * prepared transactions. + */ RecoverPreparedTransactions(); + RecoverFdwXacts(); /* * Shutdown the recovery environment. This must occur after @@ -9281,6 +9296,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) /* We deliberately delay 2PC checkpointing as long as possible */ CheckPointTwoPhase(checkPointRedo); + CheckPointFdwXacts(checkPointRedo); } /* @@ -9829,6 +9845,7 @@ XLogReportParameters(void) max_worker_processes != ControlFile->max_worker_processes || max_wal_senders != ControlFile->max_wal_senders || max_prepared_xacts != ControlFile->max_prepared_xacts || + max_prepared_foreign_xacts != ControlFile->max_prepared_foreign_xacts || max_locks_per_xact != ControlFile->max_locks_per_xact || track_commit_timestamp != ControlFile->track_commit_timestamp) { @@ -9848,6 +9865,7 @@ XLogReportParameters(void) xlrec.max_worker_processes = max_worker_processes; xlrec.max_wal_senders = max_wal_senders; xlrec.max_prepared_xacts = max_prepared_xacts; + xlrec.max_prepared_foreign_xacts = max_prepared_foreign_xacts; xlrec.max_locks_per_xact = max_locks_per_xact; xlrec.wal_level = wal_level; xlrec.wal_log_hints = wal_log_hints; @@ -9866,6 +9884,7 @@ XLogReportParameters(void) ControlFile->max_worker_processes = max_worker_processes; ControlFile->max_wal_senders = max_wal_senders; ControlFile->max_prepared_xacts = max_prepared_xacts; + ControlFile->max_prepared_foreign_xacts = max_prepared_foreign_xacts; ControlFile->max_locks_per_xact = max_locks_per_xact; ControlFile->wal_level = wal_level; ControlFile->wal_log_hints = wal_log_hints; @@ -10073,6 +10092,7 @@ xlog_redo(XLogReaderState *record) RunningTransactionsData running; oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids); + oldestActiveXID = PrescanFdwXacts(oldestActiveXID); /* * Construct a RunningTransactions snapshot representing a shut @@ -10276,6 +10296,7 @@ xlog_redo(XLogReaderState *record) ControlFile->max_worker_processes = xlrec.max_worker_processes; ControlFile->max_wal_senders = xlrec.max_wal_senders; ControlFile->max_prepared_xacts = xlrec.max_prepared_xacts; + ControlFile->max_prepared_foreign_xacts = xlrec.max_prepared_foreign_xacts; ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact; ControlFile->wal_level = xlrec.wal_level; ControlFile->wal_log_hints = xlrec.wal_log_hints; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5d89e77dbe..c134c5a253 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -333,6 +333,9 @@ CREATE VIEW pg_prepared_xacts AS CREATE VIEW pg_prepared_statements AS SELECT * FROM pg_prepared_statement() AS P; +CREATE VIEW pg_foreign_xacts AS + SELECT * FROM pg_foreign_xacts() AS F; + CREATE VIEW pg_seclabels AS SELECT l.objoid, l.classoid, l.objsubid, diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c index ec024fa106..492627caa1 100644 --- a/src/backend/commands/foreigncmds.c +++ b/src/backend/commands/foreigncmds.c @@ -13,6 +13,7 @@ */ #include "postgres.h" +#include "access/fdwxact.h" #include "access/htup_details.h" #include "access/reloptions.h" #include "access/table.h" @@ -1076,6 +1077,18 @@ RemoveForeignServerById(Oid srvId) if (!HeapTupleIsValid(tp)) elog(ERROR, "cache lookup failed for foreign server %u", srvId); + /* + * We cannot drop the foreign server if there is a foreign prepared + * transaction with this foreign server, + */ + if (FdwXactExists(MyDatabaseId, srvId, InvalidOid)) + { + Form_pg_foreign_server srvForm = (Form_pg_foreign_server) GETSTRUCT(tp); + ereport(WARNING, + (errmsg("server \"%s\" has unresolved prepared transactions on it", + NameStr(srvForm->srvname)))); + } + CatalogTupleDelete(rel, &tp->t_self); ReleaseSysCache(tp); @@ -1396,6 +1409,15 @@ RemoveUserMapping(DropUserMappingStmt *stmt) user_mapping_ddl_aclcheck(useId, srv->serverid, srv->servername); + /* + * We cannot drop the user mapping if there is a foreign prepared + * transaction with this user mapping. + */ + if (FdwXactExists(MyDatabaseId, srv->serverid, useId)) + ereport(WARNING, + (errmsg("server \"%s\" has unresolved prepared transaction for user \"%s\"", + srv->servername, MappingUserName(useId)))); + /* * Do the deletion */ diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c index d50dc099c6..cfddb5d854 100644 --- a/src/backend/foreign/foreign.c +++ b/src/backend/foreign/foreign.c @@ -332,6 +332,12 @@ GetFdwRoutine(Oid fdwhandler) Assert((routine->CommitForeignTransaction && routine->RollbackForeignTransaction) || (!routine->CommitForeignTransaction && !routine->RollbackForeignTransaction)); + /* FDW supporting prepare API must support also commit and rollback APIs */ + Assert((routine->PrepareForeignTransaction && + routine->CommitForeignTransaction && + routine->RollbackForeignTransaction) || + !routine->PrepareForeignTransaction); + return routine; } diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 3f24a33ef1..c34d14bab8 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -4158,6 +4158,15 @@ pgstat_get_wait_io(WaitEventIO w) case WAIT_EVENT_DSM_FILL_ZERO_WRITE: event_name = "DSMFillZeroWrite"; break; + case WAIT_EVENT_FDWXACT_FILE_WRITE: + event_name = "FdwXactFileWrite"; + break; + case WAIT_EVENT_FDWXACT_FILE_READ: + event_name = "FdwXactFileRead"; + break; + case WAIT_EVENT_FDWXACT_FILE_SYNC: + event_name = "FdwXactFileSync"; + break; case WAIT_EVENT_LOCK_FILE_ADDTODATADIR_READ: event_name = "LockFileAddToDataDirRead"; break; diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 7de27ee4e0..9e11bf3822 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -93,6 +93,7 @@ #include #endif +#include "access/fdwxact.h" #include "access/transam.h" #include "access/xlog.h" #include "bootstrap/bootstrap.h" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index afa1df00d0..d897f2c5fc 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -178,6 +178,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_COMMIT_TS_ID: case RM_REPLORIGIN_ID: case RM_GENERIC_ID: + case RM_FDWXACT_ID: /* just deal with xid, and done */ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), buf.origptr); diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index f9bbe97b50..6f14a950bf 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -16,6 +16,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/fdwxact.h" #include "access/heapam.h" #include "access/multixact.h" #include "access/nbtree.h" @@ -149,6 +150,7 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); + size = add_size(size, FdwXactShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -267,6 +269,7 @@ CreateSharedMemoryAndSemaphores(void) BTreeShmemInit(); SyncScanShmemInit(); AsyncShmemInit(); + FdwXactShmemInit(); #ifdef EXEC_BACKEND diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index cf12eda504..ba6d6c7c2d 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -96,6 +96,8 @@ typedef struct ProcArrayStruct TransactionId replication_slot_xmin; /* oldest catalog xmin of any replication slot */ TransactionId replication_slot_catalog_xmin; + /* local transaction id of oldest unresolved distributed transaction */ + TransactionId fdwxact_unresolved_xmin; /* indexes into allProcs[], has PROCARRAY_MAXPROCS entries */ int pgprocnos[FLEXIBLE_ARRAY_MEMBER]; @@ -187,11 +189,13 @@ typedef struct ComputeXidHorizonsResult FullTransactionId latest_completed; /* - * The same for procArray->replication_slot_xmin and. - * procArray->replication_slot_catalog_xmin. + * The same for procArray->replication_slot_xmin, + * procArray->replication_slot_catalog_xmin, and + * procArray->fdwxact_unresolved_xmin. */ TransactionId slot_xmin; TransactionId slot_catalog_xmin; + TransactionId fdwxact_unresolved_xmin; /* * Oldest xid that any backend might still consider running. This needs to @@ -210,8 +214,9 @@ typedef struct ComputeXidHorizonsResult * Oldest xid for which deleted tuples need to be retained in shared * tables. * - * This includes the effects of replication slots. If that's not desired, - * look at shared_oldest_nonremovable_raw; + * This includes the effects of replication slots as unresolved + * foreign transactions. If that's not desired, look at + * shared_oldest_nonremovable_raw; */ TransactionId shared_oldest_nonremovable; @@ -418,6 +423,7 @@ CreateSharedProcArray(void) procArray->lastOverflowedXid = InvalidTransactionId; procArray->replication_slot_xmin = InvalidTransactionId; procArray->replication_slot_catalog_xmin = InvalidTransactionId; + procArray->fdwxact_unresolved_xmin = InvalidTransactionId; ShmemVariableCache->xactCompletionCount = 1; } @@ -1709,6 +1715,7 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) */ h->slot_xmin = procArray->replication_slot_xmin; h->slot_catalog_xmin = procArray->replication_slot_catalog_xmin; + h->fdwxact_unresolved_xmin = procArray->fdwxact_unresolved_xmin; for (int index = 0; index < arrayP->numProcs; index++) { @@ -1836,6 +1843,12 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) h->data_oldest_nonremovable = TransactionIdOlder(h->data_oldest_nonremovable, h->slot_xmin); + /* + * Check whether there are unresolved distributed transaction requiring + * an older xmin. + */ + h->shared_oldest_nonremovable = + TransactionIdOlder(h->data_oldest_nonremovable, h->fdwxact_unresolved_xmin); /* * The only difference between catalog / data horizons is that the slot's * catalog xmin is applied to the catalog one (so catalogs can be accessed @@ -1893,6 +1906,9 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) Assert(!TransactionIdIsValid(h->slot_catalog_xmin) || TransactionIdPrecedesOrEquals(h->oldest_considered_running, h->slot_catalog_xmin)); + Assert(!TransactionIdIsValid(h->fdwxact_unresolved_xmin) || + TransactionIdPrecedesOrEquals(h->oldest_considered_running, + h->fdwxact_unresolved_xmin)); /* update approximate horizons with the computed horizons */ GlobalVisUpdateApply(h); @@ -3804,6 +3820,38 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin, LWLockRelease(ProcArrayLock); } +/* + * ProcArraySetFdwXactUnresolvedXmin + * + * Install limits to future computations of the xmin horizon to prevent + * vacuum clog from affected transactions needed by resolving distributed + * transaction. + */ +void +ProcArraySetFdwXactUnresolvedXmin(TransactionId xmin) +{ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + procArray->fdwxact_unresolved_xmin = xmin; + LWLockRelease(ProcArrayLock); +} + +/* + * ProcArrayGetFdwXactUnresolvedXmin + * + * Return the current unresolved xmin limits. + */ +TransactionId +ProcArrayGetFdwXactUnresolvedXmin(void) +{ + TransactionId xmin; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + xmin = procArray->fdwxact_unresolved_xmin; + LWLockRelease(ProcArrayLock); + + return xmin; +} + /* * XidCacheRemoveRunningXids * diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index 774292fd94..dc29a7ea6f 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -53,3 +53,4 @@ XactTruncationLock 44 # 45 was XactTruncationLock until removal of BackendRandomLock WrapLimitsVacuumLock 46 NotifyQueueTailLock 47 +FdwXactLock 48 diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 17579eeaca..9c78b2a90a 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -30,6 +30,7 @@ #include #include "access/commit_ts.h" +#include "access/fdwxact.h" #include "access/gin.h" #include "access/rmgr.h" #include "access/tableam.h" @@ -2470,6 +2471,16 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_prepared_foreign_transactions", PGC_POSTMASTER, RESOURCES_MEM, + gettext_noop("Sets the maximum number of simultaneously prepared transactions on foreign servers."), + NULL + }, + &max_prepared_foreign_xacts, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + #ifdef LOCK_DEBUG { {"trace_lock_oidmin", PGC_SUSET, DEVELOPER_OPTIONS, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 8930a94fff..68548b4633 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -127,6 +127,8 @@ #temp_buffers = 8MB # min 800kB #max_prepared_transactions = 0 # zero disables the feature # (change requires restart) +#max_prepared_foreign_transactions = 0 # zero disables the feature + # (change requires restart) # Caution: it is not advisable to set max_prepared_transactions nonzero unless # you actively intend to use prepared transactions. #work_mem = 4MB # min 64kB diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index c854221a30..db9fb14623 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -204,6 +204,7 @@ static const char *const subdirs[] = { "pg_wal/archive_status", "pg_commit_ts", "pg_dynshmem", + "pg_fdwxact", "pg_notify", "pg_serial", "pg_snapshots", diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index 3e00ac0f70..53bc3d82d7 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -302,6 +302,8 @@ main(int argc, char *argv[]) ControlFile->max_wal_senders); printf(_("max_prepared_xacts setting: %d\n"), ControlFile->max_prepared_xacts); + printf(_("max_prepared_foreign_transactions setting: %d\n"), + ControlFile->max_prepared_foreign_xacts); printf(_("max_locks_per_xact setting: %d\n"), ControlFile->max_locks_per_xact); printf(_("track_commit_timestamp setting: %s\n"), diff --git a/src/bin/pg_resetwal/pg_resetwal.c b/src/bin/pg_resetwal/pg_resetwal.c index 805dafef07..dd70a0f8a2 100644 --- a/src/bin/pg_resetwal/pg_resetwal.c +++ b/src/bin/pg_resetwal/pg_resetwal.c @@ -710,6 +710,7 @@ GuessControlValues(void) ControlFile.max_wal_senders = 10; ControlFile.max_worker_processes = 8; ControlFile.max_prepared_xacts = 0; + ControlFile.max_prepared_foreign_xacts = 0; ControlFile.max_locks_per_xact = 64; ControlFile.maxAlign = MAXIMUM_ALIGNOF; @@ -914,6 +915,7 @@ RewriteControlFile(void) ControlFile.max_wal_senders = 10; ControlFile.max_worker_processes = 8; ControlFile.max_prepared_xacts = 0; + ControlFile.max_prepared_foreign_xacts = 0; ControlFile.max_locks_per_xact = 64; /* The control file gets flushed here. */ diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 852d8ca4b1..b616cea347 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -11,6 +11,7 @@ #include "access/brin_xlog.h" #include "access/clog.h" #include "access/commit_ts.h" +#include "access/fdwxact_xlog.h" #include "access/generic_xlog.h" #include "access/ginxlog.h" #include "access/gistxlog.h" diff --git a/src/include/access/fdwxact.h b/src/include/access/fdwxact.h index 6c8b111ab5..9ba819e9d1 100644 --- a/src/include/access/fdwxact.h +++ b/src/include/access/fdwxact.h @@ -10,24 +10,112 @@ #ifndef FDWXACT_H #define FDWXACT_H +#include "access/fdwxact_xlog.h" #include "foreign/foreign.h" +#include "storage/proc.h" +#include "storage/shmem.h" +#include "storage/s_lock.h" /* Flag passed to FDW transaction management APIs */ #define FDWXACT_FLAG_ONEPHASE 0x01 /* transaction can commit/rollback * without preparation */ +/* Enum to track the status of foreign transaction */ +typedef enum +{ + FDWXACT_STATUS_INVALID = 0, + FDWXACT_STATUS_PREPARING, /* foreign transaction is being prepared */ + FDWXACT_STATUS_PREPARED, /* foreign transaction is prepared */ + FDWXACT_STATUS_COMMITTING, /* foreign prepared transaction is committed */ + FDWXACT_STATUS_ABORTING /* foreign prepared transaction is aborted */ +} FdwXactStatus; + +/* + * Shared memory state of a single foreign transaction. + */ +typedef struct FdwXactData *FdwXact; +typedef struct FdwXactData +{ + FdwXact fdwxact_free_next; /* Next free FdwXact entry */ + + TransactionId local_xid; /* XID of local transaction */ + + /* Information relevant with foreign transaction */ + Oid dbid; + Oid serverid; + Oid userid; + Oid umid; + + /* Foreign transaction status */ + FdwXactStatus status; + slock_t mutex; /* protect the above field */ + + /* + * Note that we need to keep track of two LSNs for each FdwXact. We keep + * track of the start LSN because this is the address we must use to read + * state data back from WAL when committing a FdwXact. We keep track of + * the end LSN because that is the LSN we need to wait for prior to + * commit. + */ + XLogRecPtr insert_start_lsn; /* XLOG offset of inserting this entry + * start */ + XLogRecPtr insert_end_lsn; /* XLOG offset of inserting this entry end */ + + bool valid; /* has the entry been complete and written to + * file? */ + BackendId locking_backend; /* backend currently working on the fdw xact */ + bool ondisk; /* true if prepare state file is on disk */ + bool inredo; /* true if entry was added via xlog_redo */ + + char fdwxact_id[FDWXACT_ID_MAX_LEN]; /* prepared transaction + * identifier */ +} FdwXactData; + +/* + * Shared memory layout for maintaining foreign prepared transaction entries. + * Adding or removing FdwXact entry needs to hold FdwXactLock in exclusive mode, + * and iterating fdwXacts needs that in shared mode. + */ +typedef struct +{ + /* Head of linked list of free FdwXactData structs */ + FdwXact free_fdwxacts; + + /* Number of valid foreign transaction entries */ + int num_fdwxacts; + + /* Upto max_prepared_foreign_xacts entries in the array */ + FdwXact fdwxacts[FLEXIBLE_ARRAY_MEMBER]; /* Variable length array */ +} FdwXactCtlData; + +/* Pointer to the shared memory holding the foreign transactions data */ +FdwXactCtlData *FdwXactCtl; + /* State data for foreign transaction resolution, passed to FDW callbacks */ typedef struct FdwXactRslvState { /* Foreign transaction information */ + char *fdwxact_id; ForeignServer *server; UserMapping *usermapping; int flags; /* OR of FDWXACT_FLAG_xx flags */ } FdwXactRslvState; +/* GUC parameters */ +extern int max_prepared_foreign_xacts; + /* Function declarations */ +extern Size FdwXactShmemSize(void); +extern void FdwXactShmemInit(void); extern void AtEOXact_FdwXact(bool is_commit); extern void PrePrepare_FdwXact(void); +extern bool FdwXactExists(Oid dbid, Oid serverid, Oid userid); +extern void CheckPointFdwXacts(XLogRecPtr redo_horizon); +extern void RecreateFdwXactFile(Oid dbid, TransactionId xid, Oid serverid, + Oid userid, void *content, int len); +extern void RestoreFdwXactData(void); +extern void RecoverFdwXacts(void); +extern TransactionId PrescanFdwXacts(TransactionId oldestActiveXid); #endif /* FDWXACT_H */ diff --git a/src/include/access/fdwxact_xlog.h b/src/include/access/fdwxact_xlog.h new file mode 100644 index 0000000000..b4cec76eae --- /dev/null +++ b/src/include/access/fdwxact_xlog.h @@ -0,0 +1,54 @@ +/*------------------------------------------------------------------------- + * + * fdwxact_xlog.h + * Foreign transaction XLOG definitions. + * + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * + * src/include/access/fdwxact_xlog.h + * + *------------------------------------------------------------------------- + */ +#ifndef FDWXACT_XLOG_H +#define FDWXACT_XLOG_H + +#include "access/xlogreader.h" +#include "lib/stringinfo.h" + +/* Info types for logs related to FDW transactions */ +#define XLOG_FDWXACT_INSERT 0x00 +#define XLOG_FDWXACT_REMOVE 0x10 + +/* Maximum length of the prepared transaction id, borrowed from twophase.c */ +#define FDWXACT_ID_MAX_LEN 200 + +/* + * On disk file structure, also used to WAL + */ +typedef struct +{ + TransactionId local_xid; + Oid dbid; /* database oid where to find foreign server + * and user mapping */ + Oid serverid; /* foreign server where transaction takes + * place */ + Oid userid; /* user who initiated the foreign transaction */ + Oid umid; + char fdwxact_id[FDWXACT_ID_MAX_LEN]; /* foreign txn prepare id */ +} FdwXactOnDiskData; + +typedef struct xl_fdwxact_remove +{ + TransactionId xid; + Oid serverid; + Oid userid; + Oid dbid; + bool force; +} xl_fdwxact_remove; + +extern void fdwxact_redo(XLogReaderState *record); +extern void fdwxact_desc(StringInfo buf, XLogReaderState *record); +extern const char *fdwxact_identify(uint8 info); + +#endif /* FDWXACT_XLOG_H */ diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index f582cf535f..5ab1f57212 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask) PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL) +PG_RMGR(RM_FDWXACT_ID, "Fdw Transaction", fdwxact_redo, fdwxact_desc, fdwxact_identify, NULL, NULL, NULL) diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 91786da784..3d35f89ae0 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -36,6 +36,7 @@ extern void PostPrepare_Twophase(void); extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid, bool lock_held); extern BackendId TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held); +extern bool TwoPhaseExists(TransactionId xid); extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, TimestampTz prepared_at, diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 224cae0246..0823baf1a1 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -236,6 +236,7 @@ typedef struct xl_parameter_change int max_worker_processes; int max_wal_senders; int max_prepared_xacts; + int max_prepared_foreign_xacts; int max_locks_per_xact; int wal_level; bool wal_log_hints; diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index e3f48158ce..5673ec7299 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -179,6 +179,7 @@ typedef struct ControlFileData int max_worker_processes; int max_wal_senders; int max_prepared_xacts; + int max_prepared_foreign_xacts; int max_locks_per_xact; bool track_commit_timestamp; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d27336adcd..1830364fcc 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -6030,6 +6030,24 @@ proargnames => '{type,object_names,object_args,classid,objid,objsubid}', prosrc => 'pg_get_object_address' }, +{ oid => '9706', descr => 'view foreign transactions', + proname => 'pg_foreign_xacts', prorows => '1000', proretset => 't', + provolatile => 'v', prorettype => 'record', proargtypes => '', + proallargtypes => '{xid,oid,oid,text,text,int4}', + proargmodes => '{o,o,o,o,o,o}', + proargnames => '{xid,serverid,userid,state,identifier,locker_pid}', + prosrc => 'pg_foreign_xacts' }, +{ oid => '9707', descr => 'remove foreign transaction without resolution', + proname => 'pg_remove_foreign_xact', provolatile => 'v', prorettype => 'bool', + proargtypes => 'xid oid oid', + proargnames => '{xid,serverid,userid}', + prosrc => 'pg_remove_foreign_xact' }, +{ oid => '9708', descr => 'resolve one foreign transaction', + proname => 'pg_resolve_foreign_xact', provolatile => 'v', prorettype => 'bool', + proargtypes => 'xid oid oid', + proargnames => '{xid,serverid,userid}', + prosrc => 'pg_resolve_foreign_xact' }, + { oid => '2079', descr => 'is table visible in search path?', proname => 'pg_table_is_visible', procost => '10', provolatile => 's', prorettype => 'bool', proargtypes => 'oid', prosrc => 'pg_table_is_visible' }, diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 570e605e1a..eb86b09f7a 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -171,6 +171,7 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root, List *fdw_private, RelOptInfo *child_rel); +typedef void (*PrepareForeignTransaction_function) (FdwXactRslvState *frstate); typedef void (*CommitForeignTransaction_function) (FdwXactRslvState *frstate); typedef void (*RollbackForeignTransaction_function) (FdwXactRslvState *frstate); @@ -254,6 +255,7 @@ typedef struct FdwRoutine /* Support functions for transaction management */ CommitForeignTransaction_function CommitForeignTransaction; RollbackForeignTransaction_function RollbackForeignTransaction; + PrepareForeignTransaction_function PrepareForeignTransaction; } FdwRoutine; diff --git a/src/include/pgstat.h b/src/include/pgstat.h index c38b689710..30d3a7eea0 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -1010,6 +1010,9 @@ typedef enum WAIT_EVENT_DATA_FILE_TRUNCATE, WAIT_EVENT_DATA_FILE_WRITE, WAIT_EVENT_DSM_FILL_ZERO_WRITE, + WAIT_EVENT_FDWXACT_FILE_READ, + WAIT_EVENT_FDWXACT_FILE_WRITE, + WAIT_EVENT_FDWXACT_FILE_SYNC, WAIT_EVENT_LOCK_FILE_ADDTODATADIR_READ, WAIT_EVENT_LOCK_FILE_ADDTODATADIR_SYNC, WAIT_EVENT_LOCK_FILE_ADDTODATADIR_WRITE, diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index b01fa52139..1fd53bcd60 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -93,5 +93,7 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin); +extern void ProcArraySetFdwXactUnresolvedXmin(TransactionId xmin); +extern TransactionId ProcArrayGetFdwXactUnresolvedXmin(void); #endif /* PROCARRAY_H */ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index a687e99d1e..88734ee4e4 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1352,6 +1352,13 @@ pg_file_settings| SELECT a.sourcefile, a.applied, a.error FROM pg_show_all_file_settings() a(sourcefile, sourceline, seqno, name, setting, applied, error); +pg_foreign_xacts| SELECT f.xid, + f.serverid, + f.userid, + f.state, + f.identifier, + f.locker_pid + FROM pg_foreign_xacts() f(xid, serverid, userid, state, identifier, locker_pid); pg_group| SELECT pg_authid.rolname AS groname, pg_authid.oid AS grosysid, ARRAY( SELECT pg_auth_members.member -- 2.27.0