From 1e27ac62763e112810db7b6279d0862e962db403 Mon Sep 17 00:00:00 2001 From: Andrey Lepikhov Date: Tue, 9 Jun 2020 15:02:39 +0500 Subject: [PATCH 2/3] Global-snapshots --- src/backend/access/transam/Makefile | 1 + src/backend/access/transam/global_snapshot.c | 755 ++++++++++++++++++ src/backend/access/transam/twophase.c | 156 ++++ src/backend/access/transam/xact.c | 29 + src/backend/access/transam/xlog.c | 2 + src/backend/storage/ipc/ipci.c | 3 + src/backend/storage/ipc/procarray.c | 92 ++- src/backend/storage/lmgr/lwlocknames.txt | 1 + src/backend/storage/lmgr/proc.c | 5 + src/backend/utils/misc/guc.c | 13 +- src/backend/utils/misc/postgresql.conf.sample | 2 + src/backend/utils/time/snapmgr.c | 167 +++- src/include/access/global_snapshot.h | 72 ++ src/include/access/twophase.h | 1 + src/include/catalog/pg_proc.dat | 13 + src/include/datatype/timestamp.h | 3 + src/include/fmgr.h | 1 + src/include/portability/instr_time.h | 10 + src/include/storage/proc.h | 15 + src/include/storage/procarray.h | 8 + src/include/utils/snapmgr.h | 3 + src/include/utils/snapshot.h | 8 + 22 files changed, 1353 insertions(+), 7 deletions(-) create mode 100644 src/backend/access/transam/global_snapshot.c create mode 100644 src/include/access/global_snapshot.h diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 60ff8b141e..6de567a79b 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -16,6 +16,7 @@ OBJS = \ clog.o \ commit_ts.o \ global_csn_log.o \ + global_snapshot.o \ generic_xlog.o \ multixact.o \ parallel.o \ diff --git a/src/backend/access/transam/global_snapshot.c b/src/backend/access/transam/global_snapshot.c new file mode 100644 index 0000000000..bac16828bb --- /dev/null +++ b/src/backend/access/transam/global_snapshot.c @@ -0,0 +1,755 @@ +/*------------------------------------------------------------------------- + * + * global_snapshot.c + * Support for cross-node snapshot isolation. + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/transam/global_snapshot.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/global_csn_log.h" +#include "access/global_snapshot.h" +#include "access/transam.h" +#include "access/twophase.h" +#include "access/xact.h" +#include "portability/instr_time.h" +#include "storage/lmgr.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/shmem.h" +#include "storage/spin.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/snapmgr.h" +#include "miscadmin.h" + +/* Raise a warning if imported global_csn exceeds ours by this value. */ +#define SNAP_DESYNC_COMPLAIN (1*NSECS_PER_SEC) /* 1 second */ + +/* + * GlobalSnapshotState + * + * Do not trust local clocks to be strictly monotonical and save last acquired + * value so later we can compare next timestamp with it. Accessed through + * GlobalSnapshotGenerate() and GlobalSnapshotSync(). + */ +typedef struct +{ + GlobalCSN last_global_csn; + volatile slock_t lock; +} GlobalSnapshotState; + +static GlobalSnapshotState *gsState; + + +/* + * GUC to delay advance of oldestXid for this amount of time. Also determines + * the size GlobalSnapshotXidMap circular buffer. + */ +int global_snapshot_defer_time; + +/* + * Enables this module. + */ +extern bool track_global_snapshots; + +/* + * GlobalSnapshotXidMap + * + * To be able to install global snapshot that points to past we need to keep + * old versions of tuples and therefore delay advance of oldestXid. Here we + * keep track of correspondence between snapshot's global_csn and oldestXid + * that was set at the time when the snapshot was taken. Much like the + * snapshot too old's OldSnapshotControlData does, but with finer granularity + * to seconds. + * + * Different strategies can be employed to hold oldestXid (e.g. we can track + * oldest global_csn-based snapshot among cluster nodes and map it oldestXid + * on each node) but here implemented one that tries to avoid cross-node + * communications which are tricky in case of postgres_fdw. + * + * On each snapshot acquisition GlobalSnapshotMapXmin() is called and stores + * correspondence between current global_csn and oldestXmin in a sparse way: + * global_csn is rounded to seconds (and here we use the fact that global_csn + * is just a timestamp) and oldestXmin is stored in the circular buffer where + * rounded global_csn acts as an offset from current circular buffer head. + * Size of the circular buffer is controlled by global_snapshot_defer_time GUC. + * + * When global snapshot arrives from different node we check that its + * global_csn is still in our map, otherwise we'll error out with "snapshot too + * old" message. If global_csn is successfully mapped to oldestXid we move + * backend's pgxact->xmin to proc->originalXmin and fill pgxact->xmin to + * mapped oldestXid. That way GetOldestXmin() can take into account backends + * with imported global snapshot and old tuple versions will be preserved. + * + * Also while calculating oldestXmin for our map in presence of imported + * global snapshots we should use proc->originalXmin instead of pgxact->xmin + * that was set during import. Otherwise, we can create a feedback loop: + * xmin's of imported global snapshots were calculated using our map and new + * entries in map going to be calculated based on that xmin's, and there is + * a risk to stuck forever with one non-increasing oldestXmin. All other + * callers of GetOldestXmin() are using pgxact->xmin so the old tuple versions + * are preserved. + */ +typedef struct GlobalSnapshotXidMap +{ + int head; /* offset of current freshest value */ + int size; /* total size of circular buffer */ + GlobalCSN_atomic last_csn_seconds; /* last rounded global_csn that changed + * xmin_by_second[] */ + TransactionId *xmin_by_second; /* circular buffer of oldestXmin's */ +} +GlobalSnapshotXidMap; + +static GlobalSnapshotXidMap *gsXidMap; + + +/* Estimate shared memory space needed */ +Size +GlobalSnapshotShmemSize(void) +{ + Size size = 0; + + if (track_global_snapshots || global_snapshot_defer_time > 0) + { + size += MAXALIGN(sizeof(GlobalSnapshotState)); + } + + if (global_snapshot_defer_time > 0) + { + size += sizeof(GlobalSnapshotXidMap); + size += global_snapshot_defer_time*sizeof(TransactionId); + size = MAXALIGN(size); + } + + return size; +} + +/* Init shared memory structures */ +void +GlobalSnapshotShmemInit() +{ + bool found; + + if (track_global_snapshots || global_snapshot_defer_time > 0) + { + gsState = ShmemInitStruct("gsState", + sizeof(GlobalSnapshotState), + &found); + if (!found) + { + gsState->last_global_csn = 0; + SpinLockInit(&gsState->lock); + } + } + + if (global_snapshot_defer_time > 0) + { + gsXidMap = ShmemInitStruct("gsXidMap", + sizeof(GlobalSnapshotXidMap), + &found); + if (!found) + { + int i; + + pg_atomic_init_u64(&gsXidMap->last_csn_seconds, 0); + gsXidMap->head = 0; + gsXidMap->size = global_snapshot_defer_time; + gsXidMap->xmin_by_second = + ShmemAlloc(sizeof(TransactionId)*gsXidMap->size); + + for (i = 0; i < gsXidMap->size; i++) + gsXidMap->xmin_by_second[i] = InvalidTransactionId; + } + } +} + +/* + * GlobalSnapshotStartup + * + * Set gsXidMap entries to oldestActiveXID during startup. + */ +void +GlobalSnapshotStartup(TransactionId oldestActiveXID) +{ + /* + * Run only if we have initialized shared memory and gsXidMap + * is enabled. + */ + if (IsNormalProcessingMode() && + track_global_snapshots && global_snapshot_defer_time > 0) + { + int i; + + Assert(TransactionIdIsValid(oldestActiveXID)); + for (i = 0; i < gsXidMap->size; i++) + gsXidMap->xmin_by_second[i] = oldestActiveXID; + ProcArraySetGlobalSnapshotXmin(oldestActiveXID); + } +} + +/* + * GlobalSnapshotMapXmin + * + * Maintain circular buffer of oldestXmins for several seconds in past. This + * buffer allows to shift oldestXmin in the past when backend is importing + * global transaction. Otherwise old versions of tuples that were needed for + * this transaction can be recycled by other processes (vacuum, HOT, etc). + * + * Locking here is not trivial. Called upon each snapshot creation after + * ProcArrayLock is released. Such usage creates several race conditions. It + * is possible that backend who got global_csn called GlobalSnapshotMapXmin() + * only after other backends managed to get snapshot and complete + * GlobalSnapshotMapXmin() call, or even committed. This is safe because + * + * * We already hold our xmin in MyPgXact, so our snapshot will not be + * harmed even though ProcArrayLock is released. + * + * * snapshot_global_csn is always pessmistically rounded up to the next + * second. + * + * * For performance reasons, xmin value for particular second is filled + * only once. Because of that instead of writing to buffer just our + * xmin (which is enough for our snapshot), we bump oldestXmin there -- + * it mitigates the possibility of damaging someone else's snapshot by + * writing to the buffer too advanced value in case of slowness of + * another backend who generated csn earlier, but didn't manage to + * insert it before us. + * + * * if GlobalSnapshotMapXmin() founds a gap in several seconds between + * current call and latest completed call then it should fill that gap + * with latest known values instead of new one. Otherwise it is + * possible (however highly unlikely) that this gap also happend + * between taking snapshot and call to GlobalSnapshotMapXmin() for some + * backend. And we are at risk to fill circullar buffer with + * oldestXmin's that are bigger then they actually were. + */ +void +GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn) +{ + int offset, gap, i; + GlobalCSN csn_seconds; + GlobalCSN last_csn_seconds; + volatile TransactionId oldest_deferred_xmin; + TransactionId current_oldest_xmin, previous_oldest_xmin; + + /* Callers should check config values */ + Assert(global_snapshot_defer_time > 0); + Assert(gsXidMap != NULL); + + /* + * Round up global_csn to the next second -- pessimistically and safely. + */ + csn_seconds = (snapshot_global_csn / NSECS_PER_SEC + 1); + + /* + * Fast-path check. Avoid taking exclusive GlobalSnapshotXidMapLock lock + * if oldestXid was already written to xmin_by_second[] for this rounded + * global_csn. + */ + if (pg_atomic_read_u64(&gsXidMap->last_csn_seconds) >= csn_seconds) + return; + + /* Ok, we have new entry (or entries) */ + LWLockAcquire(GlobalSnapshotXidMapLock, LW_EXCLUSIVE); + + /* Re-check last_csn_seconds under lock */ + last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds); + if (last_csn_seconds >= csn_seconds) + { + LWLockRelease(GlobalSnapshotXidMapLock); + return; + } + pg_atomic_write_u64(&gsXidMap->last_csn_seconds, csn_seconds); + + /* + * Count oldest_xmin. + * + * It was possible to calculate oldest_xmin during corresponding snapshot + * creation, but GetSnapshotData() intentionally reads only PgXact, but not + * PgProc. And we need info about originalXmin (see comment to gsXidMap) + * which is stored in PgProc because of threats in comments around PgXact + * about extending it with new fields. So just calculate oldest_xmin again, + * that anyway happens quite rarely. + */ + current_oldest_xmin = GetOldestXmin(NULL, PROCARRAY_NON_IMPORTED_XMIN); + + previous_oldest_xmin = gsXidMap->xmin_by_second[gsXidMap->head]; + + Assert(TransactionIdIsNormal(current_oldest_xmin)); + Assert(TransactionIdIsNormal(previous_oldest_xmin) || !track_global_snapshots); + + gap = csn_seconds - last_csn_seconds; + offset = csn_seconds % gsXidMap->size; + + /* Sanity check before we update head and gap */ + Assert( gap >= 1 ); + Assert( (gsXidMap->head + gap) % gsXidMap->size == offset ); + + gap = gap > gsXidMap->size ? gsXidMap->size : gap; + gsXidMap->head = offset; + + /* Fill new entry with current_oldest_xmin */ + gsXidMap->xmin_by_second[offset] = current_oldest_xmin; + + /* + * If we have gap then fill it with previous_oldest_xmin for reasons + * outlined in comment above this function. + */ + for (i = 1; i < gap; i++) + { + offset = (offset + gsXidMap->size - 1) % gsXidMap->size; + gsXidMap->xmin_by_second[offset] = previous_oldest_xmin; + } + + oldest_deferred_xmin = + gsXidMap->xmin_by_second[ (gsXidMap->head + 1) % gsXidMap->size ]; + + LWLockRelease(GlobalSnapshotXidMapLock); + + /* + * Advance procArray->global_snapshot_xmin after we released + * GlobalSnapshotXidMapLock. Since we gather not xmin but oldestXmin, it + * never goes backwards regardless of how slow we can do that. + */ + Assert(TransactionIdFollowsOrEquals(oldest_deferred_xmin, + ProcArrayGetGlobalSnapshotXmin())); + ProcArraySetGlobalSnapshotXmin(oldest_deferred_xmin); +} + + +/* + * GlobalSnapshotToXmin + * + * Get oldestXmin that took place when snapshot_global_csn was taken. + */ +TransactionId +GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn) +{ + TransactionId xmin; + GlobalCSN csn_seconds; + volatile GlobalCSN last_csn_seconds; + + /* Callers should check config values */ + Assert(global_snapshot_defer_time > 0); + Assert(gsXidMap != NULL); + + /* Round down to get conservative estimates */ + csn_seconds = (snapshot_global_csn / NSECS_PER_SEC); + + LWLockAcquire(GlobalSnapshotXidMapLock, LW_SHARED); + last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds); + if (csn_seconds > last_csn_seconds) + { + /* we don't have entry for this global_csn yet, return latest known */ + xmin = gsXidMap->xmin_by_second[gsXidMap->head]; + } + else if (last_csn_seconds - csn_seconds < gsXidMap->size) + { + /* we are good, retrieve value from our map */ + Assert(last_csn_seconds % gsXidMap->size == gsXidMap->head); + xmin = gsXidMap->xmin_by_second[csn_seconds % gsXidMap->size]; + } + else + { + /* requested global_csn is too old, let caller know */ + xmin = InvalidTransactionId; + } + LWLockRelease(GlobalSnapshotXidMapLock); + + return xmin; +} + +/* + * GlobalSnapshotGenerate + * + * Generate GlobalCSN which is actually a local time. Also we are forcing + * this time to be always increasing. Since now it is not uncommon to have + * millions of read transactions per second we are trying to use nanoseconds + * if such time resolution is available. + */ +GlobalCSN +GlobalSnapshotGenerate(bool locked) +{ + instr_time current_time; + GlobalCSN global_csn; + + Assert(track_global_snapshots || global_snapshot_defer_time > 0); + + /* + * TODO: create some macro that add small random shift to current time. + */ + INSTR_TIME_SET_CURRENT(current_time); + global_csn = (GlobalCSN) INSTR_TIME_GET_NANOSEC(current_time); + + /* TODO: change to atomics? */ + if (!locked) + SpinLockAcquire(&gsState->lock); + + if (global_csn <= gsState->last_global_csn) + global_csn = ++gsState->last_global_csn; + else + gsState->last_global_csn = global_csn; + + if (!locked) + SpinLockRelease(&gsState->lock); + + return global_csn; +} + +/* + * GlobalSnapshotSync + * + * Due to time desynchronization on different nodes we can receive global_csn + * which is greater than global_csn on this node. To preserve proper isolation + * this node needs to wait when such global_csn comes on local clock. + * + * This should happend relatively rare if nodes have running NTP/PTP/etc. + * Complain if wait time is more than SNAP_SYNC_COMPLAIN. + */ +void +GlobalSnapshotSync(GlobalCSN remote_gcsn) +{ + GlobalCSN local_gcsn; + GlobalCSN delta; + + Assert(track_global_snapshots); + + for(;;) + { + SpinLockAcquire(&gsState->lock); + if (gsState->last_global_csn > remote_gcsn) + { + /* Everything is fine */ + SpinLockRelease(&gsState->lock); + return; + } + else if ((local_gcsn = GlobalSnapshotGenerate(true)) >= remote_gcsn) + { + /* + * Everything is fine too, but last_global_csn wasn't updated for + * some time. + */ + SpinLockRelease(&gsState->lock); + return; + } + SpinLockRelease(&gsState->lock); + + /* Okay we need to sleep now */ + delta = remote_gcsn - local_gcsn; + if (delta > SNAP_DESYNC_COMPLAIN) + ereport(WARNING, + (errmsg("remote global snapshot exceeds ours by more than a second"), + errhint("Consider running NTPd on servers participating in global transaction"))); + + /* TODO: report this sleeptime somewhere? */ + pg_usleep((long) (delta/NSECS_PER_USEC)); + + /* + * Loop that checks to ensure that we actually slept for specified + * amount of time. + */ + } + + Assert(false); /* Should not happend */ + return; +} + +/* + * TransactionIdGetGlobalCSN + * + * Get GlobalCSN for specified TransactionId taking care about special xids, + * xids beyond TransactionXmin and InDoubt states. + */ +GlobalCSN +TransactionIdGetGlobalCSN(TransactionId xid) +{ + GlobalCSN global_csn; + + Assert(track_global_snapshots); + + /* Handle permanent TransactionId's for which we don't have mapping */ + if (!TransactionIdIsNormal(xid)) + { + if (xid == InvalidTransactionId) + return AbortedGlobalCSN; + if (xid == FrozenTransactionId || xid == BootstrapTransactionId) + return FrozenGlobalCSN; + Assert(false); /* Should not happend */ + } + + /* + * For xids which less then TransactionXmin GlobalCSNLog can be already + * trimmed but we know that such transaction is definetly not concurrently + * running according to any snapshot including timetravel ones. Callers + * should check TransactionDidCommit after. + */ + if (TransactionIdPrecedes(xid, TransactionXmin)) + return FrozenGlobalCSN; + + /* Read GlobalCSN from SLRU */ + global_csn = GlobalCSNLogGetCSN(xid); + + /* + * If we faced InDoubt state then transaction is beeing committed and we + * should wait until GlobalCSN will be assigned so that visibility check + * could decide whether tuple is in snapshot. See also comments in + * GlobalSnapshotPrecommit(). + */ + if (GlobalCSNIsInDoubt(global_csn)) + { + XactLockTableWait(xid, NULL, NULL, XLTW_None); + global_csn = GlobalCSNLogGetCSN(xid); + Assert(GlobalCSNIsNormal(global_csn) || + GlobalCSNIsAborted(global_csn)); + } + + Assert(GlobalCSNIsNormal(global_csn) || + GlobalCSNIsInProgress(global_csn) || + GlobalCSNIsAborted(global_csn)); + + return global_csn; +} + +/* + * XidInvisibleInGlobalSnapshot + * + * Version of XidInMVCCSnapshot for global transactions. For non-imported + * global snapshots this should give same results as XidInLocalMVCCSnapshot + * (except that aborts will be shown as invisible without going to clog) and to + * ensure such behaviour XidInMVCCSnapshot is coated with asserts that checks + * identicalness of XidInvisibleInGlobalSnapshot/XidInLocalMVCCSnapshot in + * case of ordinary snapshot. + */ +bool +XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot) +{ + GlobalCSN csn; + + Assert(track_global_snapshots); + + csn = TransactionIdGetGlobalCSN(xid); + + if (GlobalCSNIsNormal(csn)) + { + if (csn < snapshot->global_csn) + return false; + else + return true; + } + else if (GlobalCSNIsFrozen(csn)) + { + /* It is bootstrap or frozen transaction */ + return false; + } + else + { + /* It is aborted or in-progress */ + Assert(GlobalCSNIsAborted(csn) || GlobalCSNIsInProgress(csn)); + if (GlobalCSNIsAborted(csn)) + Assert(TransactionIdDidAbort(xid)); + return true; + } +} + + +/***************************************************************************** + * Functions to handle distributed commit on transaction coordinator: + * GlobalSnapshotPrepareCurrent() / GlobalSnapshotAssignCsnCurrent(). + * Correspoding functions for remote nodes are defined in twophase.c: + * pg_global_snapshot_prepare/pg_global_snapshot_assign. + *****************************************************************************/ + + +/* + * GlobalSnapshotPrepareCurrent + * + * Set InDoubt state for currently active transaction and return commit's + * global snapshot. + */ +GlobalCSN +GlobalSnapshotPrepareCurrent() +{ + TransactionId xid = GetCurrentTransactionIdIfAny(); + + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + if (TransactionIdIsValid(xid)) + { + TransactionId *subxids; + int nsubxids = xactGetCommittedChildren(&subxids); + GlobalCSNLogSetCSN(xid, nsubxids, + subxids, InDoubtGlobalCSN); + } + + /* Nothing to write if we don't have xid */ + + return GlobalSnapshotGenerate(false); +} + +/* + * GlobalSnapshotAssignCsnCurrent + * + * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly + * maximal among of values returned by GlobalSnapshotPrepareCurrent and + * pg_global_snapshot_prepare. + */ +void +GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn) +{ + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + if (!GlobalCSNIsNormal(global_csn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pg_global_snapshot_assign expects normal global_csn"))); + + /* Skip emtpty transactions */ + if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) + return; + + /* Set global_csn and defuse ProcArrayEndTransaction from assigning one */ + pg_atomic_write_u64(&MyProc->assignedGlobalCsn, global_csn); +} + + +/***************************************************************************** + * Functions to handle global and local transactions commit. + * + * For local transactions GlobalSnapshotPrecommit sets InDoubt state before + * ProcArrayEndTransaction is called and transaction data potetntially becomes + * visible to other backends. ProcArrayEndTransaction (or ProcArrayRemove in + * twophase case) then acquires global_csn under ProcArray lock and stores it + * in proc->assignedGlobalCsn. It's important that global_csn for commit is + * generated under ProcArray lock, otherwise global and local snapshots won't + * be equivalent. Consequent call to GlobalSnapshotCommit will write + * proc->assignedGlobalCsn to GlobalCSNLog. + * + * Same rules applies to global transaction, except that global_csn is already + * assigned by GlobalSnapshotAssignCsnCurrent/pg_global_snapshot_assign and + * GlobalSnapshotPrecommit is basically no-op. + * + * GlobalSnapshotAbort is slightly different comparing to commit because abort + * can skip InDoubt phase and can be called for transaction subtree. + *****************************************************************************/ + + +/* + * GlobalSnapshotAbort + * + * Abort transaction in GlobalCsnLog. We can skip InDoubt state for aborts + * since no concurrent transactions allowed to see aborted data anyway. + */ +void +GlobalSnapshotAbort(PGPROC *proc, TransactionId xid, + int nsubxids, TransactionId *subxids) +{ + if (!track_global_snapshots) + return; + + GlobalCSNLogSetCSN(xid, nsubxids, subxids, AbortedGlobalCSN); + + /* + * Clean assignedGlobalCsn anyway, as it was possibly set in + * GlobalSnapshotAssignCsnCurrent. + */ + pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN); +} + +/* + * GlobalSnapshotPrecommit + * + * Set InDoubt status for local transaction that we are going to commit. + * This step is needed to achieve consistency between local snapshots and + * global csn-based snapshots. We don't hold ProcArray lock while writing + * csn for transaction in SLRU but instead we set InDoubt status before + * transaction is deleted from ProcArray so the readers who will read csn + * in the gap between ProcArray removal and GlobalCSN assignment can wait + * until GlobalCSN is finally assigned. See also TransactionIdGetGlobalCSN(). + * + * For global transaction this does nothing as InDoubt state was written + * earlier. + * + * This should be called only from parallel group leader before backend is + * deleted from ProcArray. + */ +void +GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid, + int nsubxids, TransactionId *subxids) +{ + GlobalCSN oldAssignedGlobalCsn = InProgressGlobalCSN; + bool in_progress; + + if (!track_global_snapshots) + return; + + /* Set InDoubt status if it is local transaction */ + in_progress = pg_atomic_compare_exchange_u64(&proc->assignedGlobalCsn, + &oldAssignedGlobalCsn, + InDoubtGlobalCSN); + if (in_progress) + { + Assert(GlobalCSNIsInProgress(oldAssignedGlobalCsn)); + GlobalCSNLogSetCSN(xid, nsubxids, + subxids, InDoubtGlobalCSN); + } + else + { + /* Otherwise we should have valid GlobalCSN by this time */ + Assert(GlobalCSNIsNormal(oldAssignedGlobalCsn)); + /* Also global transaction should already be in InDoubt state */ + Assert(GlobalCSNIsInDoubt(GlobalCSNLogGetCSN(xid))); + } +} + +/* + * GlobalSnapshotCommit + * + * Write GlobalCSN that were acquired earlier to GlobalCsnLog. Should be + * preceded by GlobalSnapshotPrecommit() so readers can wait until we finally + * finished writing to SLRU. + * + * Should be called after ProcArrayEndTransaction, but before releasing + * transaction locks, so that TransactionIdGetGlobalCSN can wait on this + * lock for GlobalCSN. + */ +void +GlobalSnapshotCommit(PGPROC *proc, TransactionId xid, + int nsubxids, TransactionId *subxids) +{ + volatile GlobalCSN assigned_global_csn; + + if (!track_global_snapshots) + return; + + if (!TransactionIdIsValid(xid)) + { + assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn); + Assert(GlobalCSNIsInProgress(assigned_global_csn)); + return; + } + + /* Finally write resulting GlobalCSN in SLRU */ + assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn); + Assert(GlobalCSNIsNormal(assigned_global_csn)); + GlobalCSNLogSetCSN(xid, nsubxids, + subxids, assigned_global_csn); + + /* Reset for next transaction */ + pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN); +} diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 9a69fc1e09..c89d1005c6 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -77,6 +77,7 @@ #include #include "access/commit_ts.h" +#include "access/global_snapshot.h" #include "access/global_csn_log.h" #include "access/htup_details.h" #include "access/subtrans.h" @@ -1480,8 +1481,34 @@ FinishPreparedTransaction(const char *gid, bool isCommit) hdr->nabortrels, abortrels, gid); + /* + * GlobalSnapshot callbacks that should be called right before we are + * going to become visible. Details in comments to this functions. + */ + if (isCommit) + GlobalSnapshotPrecommit(proc, xid, hdr->nsubxacts, children); + else + GlobalSnapshotAbort(proc, xid, hdr->nsubxacts, children); + + ProcArrayRemove(proc, latestXid); + /* + * Stamp our transaction with GlobalCSN in GlobalCsnLog. + * Should be called after ProcArrayEndTransaction, but before releasing + * transaction locks, since TransactionIdGetGlobalCSN relies on + * XactLockTableWait to await global_csn. + */ + if (isCommit) + { + GlobalSnapshotCommit(proc, xid, hdr->nsubxacts, children); + } + else + { + Assert(GlobalCSNIsInProgress( + pg_atomic_read_u64(&proc->assignedGlobalCsn))); + } + /* * In case we fail while running the callbacks, mark the gxact invalid so * no one else will try to commit/rollback, and so it will be recycled if @@ -2442,3 +2469,132 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) RemoveTwoPhaseFile(xid, giveWarning); RemoveGXact(gxact); } + +/* + * GlobalSnapshotPrepareTwophase + * + * Set InDoubt state for currently active transaction and return commit's + * global snapshot. + * + * This function is a counterpart of GlobalSnapshotPrepareCurrent() for + * twophase transactions. + */ +static GlobalCSN +GlobalSnapshotPrepareTwophase(const char *gid) +{ + GlobalTransaction gxact; + PGXACT *pgxact; + char *buf; + TransactionId xid; + xl_xact_parsed_prepare parsed; + + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + /* + * Validate the GID, and lock the GXACT to ensure that two backends do not + * try to access the same GID at once. + */ + gxact = LockGXact(gid, GetUserId()); + pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + xid = pgxact->xid; + + if (gxact->ondisk) + buf = ReadTwoPhaseFile(xid, true); + else + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL); + + ParsePrepareRecord(0, (xl_xact_prepare *) buf, &parsed); + + GlobalCSNLogSetCSN(xid, parsed.nsubxacts, + parsed.subxacts, InDoubtGlobalCSN); + + /* Unlock our GXACT */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + gxact->locking_backend = InvalidBackendId; + LWLockRelease(TwoPhaseStateLock); + + pfree(buf); + + return GlobalSnapshotGenerate(false); +} + +/* + * SQL interface to GlobalSnapshotPrepareTwophase() + * + * TODO: Rewrite this as PREPARE TRANSACTION 'gid' RETURNING SNAPSHOT + */ +Datum +pg_global_snapshot_prepare(PG_FUNCTION_ARGS) +{ + const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0)); + GlobalCSN global_csn; + + global_csn = GlobalSnapshotPrepareTwophase(gid); + + PG_RETURN_INT64(global_csn); +} + + +/* + * TwoPhaseAssignGlobalCsn + * + * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly + * maximal among of values returned by GlobalSnapshotPrepareCurrent and + * pg_global_snapshot_prepare. + * + * This function is a counterpart of GlobalSnapshotAssignCsnCurrent() for + * twophase transactions. + */ +static void +GlobalSnapshotAssignCsnTwoPhase(const char *gid, GlobalCSN global_csn) +{ + GlobalTransaction gxact; + PGPROC *proc; + + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + if (!GlobalCSNIsNormal(global_csn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pg_global_snapshot_assign expects normal global_csn"))); + + /* + * Validate the GID, and lock the GXACT to ensure that two backends do not + * try to access the same GID at once. + */ + gxact = LockGXact(gid, GetUserId()); + proc = &ProcGlobal->allProcs[gxact->pgprocno]; + + /* Set global_csn and defuse ProcArrayRemove from assigning one. */ + pg_atomic_write_u64(&proc->assignedGlobalCsn, global_csn); + + /* Unlock our GXACT */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + gxact->locking_backend = InvalidBackendId; + LWLockRelease(TwoPhaseStateLock); +} + +/* + * SQL interface to GlobalSnapshotAssignCsnTwoPhase() + * + * TODO: Rewrite this as COMMIT PREPARED 'gid' SNAPSHOT 'global_csn' + */ +Datum +pg_global_snapshot_assign(PG_FUNCTION_ARGS) +{ + const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0)); + GlobalCSN global_csn = PG_GETARG_INT64(1); + + GlobalSnapshotAssignCsnTwoPhase(gid, global_csn); + PG_RETURN_VOID(); +} diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index cd30b62d36..042239ec0e 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -21,6 +21,7 @@ #include #include "access/commit_ts.h" +#include "access/global_snapshot.h" #include "access/multixact.h" #include "access/parallel.h" #include "access/subtrans.h" @@ -1433,6 +1434,14 @@ RecordTransactionCommit(void) /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd = 0; + + /* + * Mark our transaction as InDoubt in GlobalCsnLog and get ready for + * commit. + */ + if (markXidCommitted) + GlobalSnapshotPrecommit(MyProc, xid, nchildren, children); + cleanup: /* Clean up local data */ if (rels) @@ -1694,6 +1703,11 @@ RecordTransactionAbort(bool isSubXact) */ TransactionIdAbortTree(xid, nchildren, children); + /* + * Mark our transaction as Aborted in GlobalCsnLog. + */ + GlobalSnapshotAbort(MyProc, xid, nchildren, children); + END_CRIT_SECTION(); /* Compute latestXid while we have the child XIDs handy */ @@ -2183,6 +2197,21 @@ CommitTransaction(void) */ ProcArrayEndTransaction(MyProc, latestXid); + /* + * Stamp our transaction with GlobalCSN in GlobalCsnLog. + * Should be called after ProcArrayEndTransaction, but before releasing + * transaction locks. + */ + if (!is_parallel_worker) + { + TransactionId xid = GetTopTransactionIdIfAny(); + TransactionId *subxids; + int nsubxids; + + nsubxids = xactGetCommittedChildren(&subxids); + GlobalSnapshotCommit(MyProc, xid, nsubxids, subxids); + } + /* * This is all post-commit cleanup. Note that if an error is raised here, * it's too late to abort the transaction. This should be just diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 4ffe4aad03..aa91526468 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7063,6 +7063,7 @@ StartupXLOG(void) StartupCLOG(); StartupGlobalCSNLog(oldestActiveXID); StartupSUBTRANS(oldestActiveXID); + GlobalSnapshotStartup(oldestActiveXID); /* * If we're beginning at a shutdown checkpoint, we know that @@ -7881,6 +7882,7 @@ StartupXLOG(void) StartupCLOG(); StartupGlobalCSNLog(oldestActiveXID); StartupSUBTRANS(oldestActiveXID); + GlobalSnapshotStartup(oldestActiveXID); } /* diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index dc2d2959c4..d1819dc2c8 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -22,6 +22,7 @@ #include "access/nbtree.h" #include "access/subtrans.h" #include "access/twophase.h" +#include "access/global_snapshot.h" #include "commands/async.h" #include "miscadmin.h" #include "pgstat.h" @@ -145,6 +146,7 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, WalSndShmemSize()); size = add_size(size, WalRcvShmemSize()); size = add_size(size, ApplyLauncherShmemSize()); + size = add_size(size, GlobalSnapshotShmemSize()); size = add_size(size, SnapMgrShmemSize()); size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); @@ -266,6 +268,7 @@ CreateSharedMemoryAndSemaphores(void) BTreeShmemInit(); SyncScanShmemInit(); AsyncShmemInit(); + GlobalSnapshotShmemInit(); #ifdef EXEC_BACKEND diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 486da77f68..90c0e90b46 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -47,6 +47,7 @@ #include "access/clog.h" #include "access/global_csn_log.h" +#include "access/global_snapshot.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/twophase.h" @@ -95,6 +96,8 @@ typedef struct ProcArrayStruct TransactionId replication_slot_xmin; /* oldest catalog xmin of any replication slot */ TransactionId replication_slot_catalog_xmin; + /* xmin of oldest active global snapshot */ + TransactionId global_snapshot_xmin; /* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */ int pgprocnos[FLEXIBLE_ARRAY_MEMBER]; @@ -250,6 +253,7 @@ CreateSharedProcArray(void) procArray->lastOverflowedXid = InvalidTransactionId; procArray->replication_slot_xmin = InvalidTransactionId; procArray->replication_slot_catalog_xmin = InvalidTransactionId; + procArray->global_snapshot_xmin = InvalidTransactionId; } allProcs = ProcGlobal->allProcs; @@ -353,6 +357,17 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid) if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, latestXid)) ShmemVariableCache->latestCompletedXid = latestXid; + + /* + * Assign global csn while holding ProcArrayLock for non-global + * COMMIT PREPARED. After lock is released consequent + * GlobalSnapshotCommit() will write this value to GlobalCsnLog. + * + * In case of global commit proc->assignedGlobalCsn is already set + * by prior AssignGlobalCsn(). + */ + if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn))) + pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false)); } else { @@ -433,6 +448,8 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid) proc->lxid = InvalidLocalTransactionId; pgxact->xmin = InvalidTransactionId; + proc->originalXmin = InvalidTransactionId; + /* must be cleared with xid/xmin: */ pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; proc->delayChkpt = false; /* be sure this is cleared in abort */ @@ -455,6 +472,8 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact, pgxact->xid = InvalidTransactionId; proc->lxid = InvalidLocalTransactionId; pgxact->xmin = InvalidTransactionId; + proc->originalXmin = InvalidTransactionId; + /* must be cleared with xid/xmin: */ pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; proc->delayChkpt = false; /* be sure this is cleared in abort */ @@ -468,6 +487,20 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact, if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, latestXid)) ShmemVariableCache->latestCompletedXid = latestXid; + + /* + * Assign global csn while holding ProcArrayLock for non-global + * COMMIT. After lock is released consequent GlobalSnapshotFinish() will + * write this value to GlobalCsnLog. + * + * In case of global commit MyProc->assignedGlobalCsn is already set + * by prior AssignGlobalCsn(). + * + * TODO: in case of group commit we can generate one GlobalSnapshot for + * whole group to save time on timestamp aquisition. + */ + if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn))) + pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false)); } /* @@ -611,6 +644,7 @@ ProcArrayClearTransaction(PGPROC *proc) pgxact->xid = InvalidTransactionId; proc->lxid = InvalidLocalTransactionId; pgxact->xmin = InvalidTransactionId; + proc->originalXmin = InvalidTransactionId; proc->recoveryConflictPending = false; /* redundant, but just in case */ @@ -1313,6 +1347,7 @@ GetOldestXmin(Relation rel, int flags) TransactionId replication_slot_xmin = InvalidTransactionId; TransactionId replication_slot_catalog_xmin = InvalidTransactionId; + TransactionId global_snapshot_xmin = InvalidTransactionId; /* * If we're not computing a relation specific limit, or if a shared @@ -1349,8 +1384,9 @@ GetOldestXmin(Relation rel, int flags) proc->databaseId == MyDatabaseId || proc->databaseId == 0) /* always include WalSender */ { - /* Fetch xid just once - see GetNewTransactionId */ + /* Fetch both xids just once - see GetNewTransactionId */ TransactionId xid = UINT32_ACCESS_ONCE(pgxact->xid); + TransactionId original_xmin = UINT32_ACCESS_ONCE(proc->originalXmin); /* First consider the transaction's own Xid, if any */ if (TransactionIdIsNormal(xid) && @@ -1363,8 +1399,17 @@ GetOldestXmin(Relation rel, int flags) * We must check both Xid and Xmin because a transaction might * have an Xmin but not (yet) an Xid; conversely, if it has an * Xid, that could determine some not-yet-set Xmin. + * + * In case of oldestXmin calculation for GlobalSnapshotMapXmin() + * pgxact->xmin should be changed to proc->originalXmin. Details + * in commets to GlobalSnapshotMapXmin. */ - xid = UINT32_ACCESS_ONCE(pgxact->xmin); + if ((flags & PROCARRAY_NON_IMPORTED_XMIN) && + TransactionIdIsValid(original_xmin)) + xid = original_xmin; + else + xid = UINT32_ACCESS_ONCE(pgxact->xmin); + if (TransactionIdIsNormal(xid) && TransactionIdPrecedes(xid, result)) result = xid; @@ -1378,6 +1423,7 @@ GetOldestXmin(Relation rel, int flags) */ replication_slot_xmin = procArray->replication_slot_xmin; replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; + global_snapshot_xmin = ProcArrayGetGlobalSnapshotXmin(); if (RecoveryInProgress()) { @@ -1419,6 +1465,11 @@ GetOldestXmin(Relation rel, int flags) result = FirstNormalTransactionId; } + if (!(flags & PROCARRAY_NON_IMPORTED_XMIN) && + TransactionIdIsValid(global_snapshot_xmin) && + NormalTransactionIdPrecedes(global_snapshot_xmin, result)) + result = global_snapshot_xmin; + /* * Check whether there are replication slots requiring an older xmin. */ @@ -1513,8 +1564,10 @@ GetSnapshotData(Snapshot snapshot) int count = 0; int subcount = 0; bool suboverflowed = false; + GlobalCSN global_csn = FrozenGlobalCSN; TransactionId replication_slot_xmin = InvalidTransactionId; TransactionId replication_slot_catalog_xmin = InvalidTransactionId; + TransactionId global_snapshot_xmin = InvalidTransactionId; Assert(snapshot != NULL); @@ -1706,10 +1759,18 @@ GetSnapshotData(Snapshot snapshot) */ replication_slot_xmin = procArray->replication_slot_xmin; replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; + global_snapshot_xmin = ProcArrayGetGlobalSnapshotXmin(); if (!TransactionIdIsValid(MyPgXact->xmin)) MyPgXact->xmin = TransactionXmin = xmin; + /* + * Take GlobalCSN under ProcArrayLock so the local/global snapshot stays + * synchronized. + */ + if (track_global_snapshots) + global_csn = GlobalSnapshotGenerate(false); + LWLockRelease(ProcArrayLock); /* @@ -1725,6 +1786,10 @@ GetSnapshotData(Snapshot snapshot) if (!TransactionIdIsNormal(RecentGlobalXmin)) RecentGlobalXmin = FirstNormalTransactionId; + if (/*track_global_snapshots && */TransactionIdIsValid(global_snapshot_xmin) && + TransactionIdPrecedes(global_snapshot_xmin, RecentGlobalXmin)) + RecentGlobalXmin = global_snapshot_xmin; + /* Check whether there's a replication slot requiring an older xmin. */ if (TransactionIdIsValid(replication_slot_xmin) && NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin)) @@ -1780,6 +1845,11 @@ GetSnapshotData(Snapshot snapshot) MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin); } + snapshot->imported_global_csn = false; + snapshot->global_csn = global_csn; + if (global_snapshot_defer_time > 0 && IsUnderPostmaster) + GlobalSnapshotMapXmin(snapshot->global_csn); + return snapshot; } @@ -3127,6 +3197,24 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin, LWLockRelease(ProcArrayLock); } +/* + * ProcArraySetGlobalSnapshotXmin + */ +void +ProcArraySetGlobalSnapshotXmin(TransactionId xmin) +{ + /* We rely on atomic fetch/store of xid */ + procArray->global_snapshot_xmin = xmin; +} + +/* + * ProcArrayGetGlobalSnapshotXmin + */ +TransactionId +ProcArrayGetGlobalSnapshotXmin(void) +{ + return procArray->global_snapshot_xmin; +} #define XidCacheRemove(i) \ do { \ diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index aa904b1f17..45d5b8e6ed 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -51,3 +51,4 @@ OldSnapshotTimeMapLock 42 LogicalRepWorkerLock 43 XactTruncationLock 44 GlobalCSNLogControlLock 45 +GlobalSnapshotXidMapLock 46 diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index f5eef6fa4e..cad38c18a6 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -37,6 +37,7 @@ #include "access/transam.h" #include "access/twophase.h" +#include "access/global_snapshot.h" #include "access/xact.h" #include "miscadmin.h" #include "pgstat.h" @@ -441,6 +442,9 @@ InitProcess(void) MyProc->clogGroupMemberLsn = InvalidXLogRecPtr; Assert(pg_atomic_read_u32(&MyProc->clogGroupNext) == INVALID_PGPROCNO); + MyProc->originalXmin = InvalidTransactionId; + pg_atomic_init_u64(&MyProc->assignedGlobalCsn, InProgressGlobalCSN); + /* * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch * on it. That allows us to repoint the process latch, which so far @@ -584,6 +588,7 @@ InitAuxiliaryProcess(void) MyProc->lwWaitMode = 0; MyProc->waitLock = NULL; MyProc->waitProcLock = NULL; + MyProc->originalXmin = InvalidTransactionId; #ifdef USE_ASSERT_CHECKING { int i; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 4910e4fc66..79d7123f9f 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -28,6 +28,7 @@ #include "access/commit_ts.h" #include "access/gin.h" +#include "access/global_snapshot.h" #include "access/rmgr.h" #include "access/tableam.h" #include "access/transam.h" @@ -1178,7 +1179,7 @@ static struct config_bool ConfigureNamesBool[] = gettext_noop("Used to achieve REPEATEBLE READ isolation level for postgres_fdw transactions.") }, &track_global_snapshots, - true, /* XXX: set true to simplify tesing. XXX2: Seems that RESOURCES_MEM isn't the best catagory */ + false, /* XXX: Seems that RESOURCES_MEM isn't the best catagory */ NULL, NULL, NULL }, { @@ -2467,6 +2468,16 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"global_snapshot_defer_time", PGC_POSTMASTER, REPLICATION_MASTER, + gettext_noop("Minimal age of records which allowed to be vacuumed, in seconds."), + NULL + }, + &global_snapshot_defer_time, + 5, 0, INT_MAX, + NULL, NULL, NULL + }, + /* * See also CheckRequiredParameterValues() if this parameter changes */ diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index ac02bd0c00..cbd6de119a 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -306,6 +306,8 @@ # and comma-separated list of application_name # from standby(s); '*' = all #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed +#global_snapshot_defer_time = 0 # minimal age of records which allowed to be + # vacuumed, in seconds # - Standby Servers - diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 1c063c592c..3d925a7866 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -48,6 +48,7 @@ #include #include +#include "access/global_snapshot.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/xact.h" @@ -247,6 +248,8 @@ typedef struct SerializedSnapshotData CommandId curcid; TimestampTz whenTaken; XLogRecPtr lsn; + GlobalCSN global_csn; + bool imported_global_csn; } SerializedSnapshotData; Size @@ -1024,7 +1027,9 @@ SnapshotResetXmin(void) pairingheap_first(&RegisteredSnapshots)); if (TransactionIdPrecedes(MyPgXact->xmin, minSnapshot->xmin)) + { MyPgXact->xmin = minSnapshot->xmin; + } } /* @@ -2115,6 +2120,8 @@ SerializeSnapshot(Snapshot snapshot, char *start_address) serialized_snapshot.curcid = snapshot->curcid; serialized_snapshot.whenTaken = snapshot->whenTaken; serialized_snapshot.lsn = snapshot->lsn; + serialized_snapshot.global_csn = snapshot->global_csn; + serialized_snapshot.imported_global_csn = snapshot->imported_global_csn; /* * Ignore the SubXID array if it has overflowed, unless the snapshot was @@ -2189,6 +2196,8 @@ RestoreSnapshot(char *start_address) snapshot->curcid = serialized_snapshot.curcid; snapshot->whenTaken = serialized_snapshot.whenTaken; snapshot->lsn = serialized_snapshot.lsn; + snapshot->global_csn = serialized_snapshot.global_csn; + snapshot->imported_global_csn = serialized_snapshot.imported_global_csn; /* Copy XIDs, if present. */ if (serialized_snapshot.xcnt > 0) @@ -2228,8 +2237,8 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc) } /* - * XidInMVCCSnapshot - * Is the given XID still-in-progress according to the snapshot? + * XidInLocalMVCCSnapshot + * Is the given XID still-in-progress according to the local snapshot? * * Note: GetSnapshotData never stores either top xid or subxids of our own * backend into a snapshot, so these xids will not be reported as "running" @@ -2237,8 +2246,8 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc) * TransactionIdIsCurrentTransactionId first, except when it's known the * XID could not be ours anyway. */ -bool -XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) +static bool +XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot) { uint32 i; @@ -2348,3 +2357,153 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) return false; } + +/* + * XidInMVCCSnapshot + * + * Check whether this xid is in snapshot, taking into account fact that + * snapshot can be global. When track_global_snapshots is switched off + * just call XidInLocalMVCCSnapshot(). + */ +bool +XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) +{ + bool in_snapshot; + + if (snapshot->imported_global_csn) + { + Assert(track_global_snapshots); + /* No point to using snapshot info except CSN */ + return XidInvisibleInGlobalSnapshot(xid, snapshot); + } + + in_snapshot = XidInLocalMVCCSnapshot(xid, snapshot); + + if (!track_global_snapshots) + { + Assert(GlobalCSNIsFrozen(snapshot->global_csn)); + return in_snapshot; + } + + if (in_snapshot) + { + /* + * This xid may be already in unknown state and in that case + * we must wait and recheck. + * + * TODO: this check can be skipped if we know for sure that there were + * no global transactions when this snapshot was taken. That requires + * some changes to mechanisms of global snapshots exprot/import (if + * backend set xmin then we should have a-priori knowledge that this + * transaction going to be global or local -- right now this is not + * enforced). Leave that for future and don't complicate this patch. + */ + return XidInvisibleInGlobalSnapshot(xid, snapshot); + } + else + { +#ifdef USE_ASSERT_CHECKING + /* Check that global snapshot gives the same results as local one */ + if (XidInvisibleInGlobalSnapshot(xid, snapshot)) + { + GlobalCSN gcsn = TransactionIdGetGlobalCSN(xid); + Assert(GlobalCSNIsAborted(gcsn)); + } +#endif + return false; + } +} + +/* + * ExportGlobalSnapshot + * + * Export global_csn so that caller can expand this transaction to other + * nodes. + * + * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and + * add some additional checks that transaction did not yet acquired xid, but + * for current iteration of this patch I don't want to hack on parser. + */ +GlobalCSN +ExportGlobalSnapshot() +{ + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not export global snapshot"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + return CurrentSnapshot->global_csn; +} + +/* SQL accessor to ExportGlobalSnapshot() */ +Datum +pg_global_snapshot_export(PG_FUNCTION_ARGS) +{ + GlobalCSN global_csn = ExportGlobalSnapshot(); + PG_RETURN_UINT64(global_csn); +} + +/* + * ImportGlobalSnapshot + * + * Import global_csn and retract this backends xmin to the value that was + * actual when we had such global_csn. + * + * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and + * add some additional checks that transaction did not yet acquired xid, but + * for current iteration of this patch I don't want to hack on parser. + */ +void +ImportGlobalSnapshot(GlobalCSN snap_global_csn) +{ + volatile TransactionId xmin; + + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not import global snapshot"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + if (global_snapshot_defer_time <= 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not import global snapshot"), + errhint("Make sure the configuration parameter \"%s\" is positive.", + "global_snapshot_defer_time"))); + + /* + * Call GlobalSnapshotToXmin under ProcArrayLock to avoid situation that + * resulting xmin will be evicted from map before we will set it into our + * backend's xmin. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + xmin = GlobalSnapshotToXmin(snap_global_csn); + if (!TransactionIdIsValid(xmin)) + { + LWLockRelease(ProcArrayLock); + elog(ERROR, "GlobalSnapshotToXmin: global snapshot too old"); + } + MyProc->originalXmin = MyPgXact->xmin; + MyPgXact->xmin = TransactionXmin = xmin; + LWLockRelease(ProcArrayLock); + + CurrentSnapshot->xmin = xmin; /* defuse SnapshotResetXmin() */ + CurrentSnapshot->global_csn = snap_global_csn; + CurrentSnapshot->imported_global_csn = true; + GlobalSnapshotSync(snap_global_csn); + + Assert(TransactionIdPrecedesOrEquals(RecentGlobalXmin, xmin)); + Assert(TransactionIdPrecedesOrEquals(RecentGlobalDataXmin, xmin)); +} + +/* SQL accessor to ImportGlobalSnapshot() */ +Datum +pg_global_snapshot_import(PG_FUNCTION_ARGS) +{ + GlobalCSN global_csn = PG_GETARG_UINT64(0); + ImportGlobalSnapshot(global_csn); + PG_RETURN_VOID(); +} diff --git a/src/include/access/global_snapshot.h b/src/include/access/global_snapshot.h new file mode 100644 index 0000000000..246b180cfd --- /dev/null +++ b/src/include/access/global_snapshot.h @@ -0,0 +1,72 @@ +/*------------------------------------------------------------------------- + * + * global_snapshot.h + * Support for cross-node snapshot isolation. + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/global_snapshot.h + * + *------------------------------------------------------------------------- + */ +#ifndef GLOBAL_SNAPSHOT_H +#define GLOBAL_SNAPSHOT_H + +#include "port/atomics.h" +#include "storage/lock.h" +#include "utils/snapshot.h" +#include "utils/guc.h" + +/* + * snapshot.h is used in frontend code so atomic variant of GlobalCSN type + * is defined here. + */ +typedef pg_atomic_uint64 GlobalCSN_atomic; + +#define InProgressGlobalCSN UINT64CONST(0x0) +#define AbortedGlobalCSN UINT64CONST(0x1) +#define FrozenGlobalCSN UINT64CONST(0x2) +#define InDoubtGlobalCSN UINT64CONST(0x3) +#define FirstNormalGlobalCSN UINT64CONST(0x4) + +#define GlobalCSNIsInProgress(csn) ((csn) == InProgressGlobalCSN) +#define GlobalCSNIsAborted(csn) ((csn) == AbortedGlobalCSN) +#define GlobalCSNIsFrozen(csn) ((csn) == FrozenGlobalCSN) +#define GlobalCSNIsInDoubt(csn) ((csn) == InDoubtGlobalCSN) +#define GlobalCSNIsNormal(csn) ((csn) >= FirstNormalGlobalCSN) + + +extern int global_snapshot_defer_time; + + +extern Size GlobalSnapshotShmemSize(void); +extern void GlobalSnapshotShmemInit(void); +extern void GlobalSnapshotStartup(TransactionId oldestActiveXID); + +extern void GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn); +extern TransactionId GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn); + +extern GlobalCSN GlobalSnapshotGenerate(bool locked); + +extern bool XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot); + +extern void GlobalSnapshotSync(GlobalCSN remote_gcsn); + +extern GlobalCSN TransactionIdGetGlobalCSN(TransactionId xid); + +extern GlobalCSN GlobalSnapshotPrepareGlobal(const char *gid); +extern void GlobalSnapshotAssignCsnGlobal(const char *gid, + GlobalCSN global_csn); + +extern GlobalCSN GlobalSnapshotPrepareCurrent(void); +extern void GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn); + +extern void GlobalSnapshotAbort(PGPROC *proc, TransactionId xid, int nsubxids, + TransactionId *subxids); +extern void GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid, int nsubxids, + TransactionId *subxids); +extern void GlobalSnapshotCommit(PGPROC *proc, TransactionId xid, int nsubxids, + TransactionId *subxids); + +#endif /* GLOBAL_SNAPSHOT_H */ diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 2ca71c3445..b4899f3754 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -18,6 +18,7 @@ #include "access/xlogdefs.h" #include "datatype/timestamp.h" #include "storage/lock.h" +#include "utils/snapshot.h" /* * GlobalTransactionData is defined in twophase.c; other places have no diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 61f2c2f5b4..c76da68a0a 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10936,4 +10936,17 @@ proname => 'is_normalized', prorettype => 'bool', proargtypes => 'text text', prosrc => 'unicode_is_normalized' }, +# global transaction handling +{ oid => '4388', descr => 'export global transaction snapshot', + proname => 'pg_global_snapshot_export', provolatile => 'v', proparallel => 'u', + prorettype => 'int8', proargtypes => '', prosrc => 'pg_global_snapshot_export' }, +{ oid => '4389', descr => 'import global transaction snapshot', + proname => 'pg_global_snapshot_import', provolatile => 'v', proparallel => 'u', + prorettype => 'void', proargtypes => 'int8', prosrc => 'pg_global_snapshot_import' }, +{ oid => '4390', descr => 'prepare distributed transaction for commit, get global_csn', + proname => 'pg_global_snapshot_prepare', provolatile => 'v', proparallel => 'u', + prorettype => 'int8', proargtypes => 'text', prosrc => 'pg_global_snapshot_prepare' }, +{ oid => '4391', descr => 'assign global_csn to distributed transaction', + proname => 'pg_global_snapshot_assign', provolatile => 'v', proparallel => 'u', + prorettype => 'void', proargtypes => 'text int8', prosrc => 'pg_global_snapshot_assign' }, ] diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h index 6be6d35d1e..583b1beea5 100644 --- a/src/include/datatype/timestamp.h +++ b/src/include/datatype/timestamp.h @@ -93,6 +93,9 @@ typedef struct #define USECS_PER_MINUTE INT64CONST(60000000) #define USECS_PER_SEC INT64CONST(1000000) +#define NSECS_PER_SEC INT64CONST(1000000000) +#define NSECS_PER_USEC INT64CONST(1000) + /* * We allow numeric timezone offsets up to 15:59:59 either way from Greenwich. * Currently, the record holders for wackiest offsets in actual use are zones diff --git a/src/include/fmgr.h b/src/include/fmgr.h index d349510b7c..5cdf2e17cb 100644 --- a/src/include/fmgr.h +++ b/src/include/fmgr.h @@ -280,6 +280,7 @@ extern struct varlena *pg_detoast_datum_packed(struct varlena *datum); #define PG_GETARG_FLOAT4(n) DatumGetFloat4(PG_GETARG_DATUM(n)) #define PG_GETARG_FLOAT8(n) DatumGetFloat8(PG_GETARG_DATUM(n)) #define PG_GETARG_INT64(n) DatumGetInt64(PG_GETARG_DATUM(n)) +#define PG_GETARG_UINT64(n) DatumGetUInt64(PG_GETARG_DATUM(n)) /* use this if you want the raw, possibly-toasted input datum: */ #define PG_GETARG_RAW_VARLENA_P(n) ((struct varlena *) PG_GETARG_POINTER(n)) /* use this if you want the input datum de-toasted: */ diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h index d6459327cc..4ac23da654 100644 --- a/src/include/portability/instr_time.h +++ b/src/include/portability/instr_time.h @@ -141,6 +141,9 @@ typedef struct timespec instr_time; #define INSTR_TIME_GET_MICROSEC(t) \ (((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) ((t).tv_nsec / 1000)) +#define INSTR_TIME_GET_NANOSEC(t) \ + (((uint64) (t).tv_sec * (uint64) 1000000000) + (uint64) ((t).tv_nsec)) + #else /* !HAVE_CLOCK_GETTIME */ /* Use gettimeofday() */ @@ -205,6 +208,10 @@ typedef struct timeval instr_time; #define INSTR_TIME_GET_MICROSEC(t) \ (((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) (t).tv_usec) +#define INSTR_TIME_GET_NANOSEC(t) \ + (((uint64) (t).tv_sec * (uint64) 1000000000) + \ + (uint64) (t).tv_usec * (uint64) 1000) + #endif /* HAVE_CLOCK_GETTIME */ #else /* WIN32 */ @@ -237,6 +244,9 @@ typedef LARGE_INTEGER instr_time; #define INSTR_TIME_GET_MICROSEC(t) \ ((uint64) (((double) (t).QuadPart * 1000000.0) / GetTimerFrequency())) +#define INSTR_TIME_GET_NANOSEC(t) \ + ((uint64) (((double) (t).QuadPart * 1000000000.0) / GetTimerFrequency())) + static inline double GetTimerFrequency(void) { diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 1ee9000b2b..aeaeb021ef 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -15,8 +15,10 @@ #define _PROC_H_ #include "access/clog.h" +#include "access/global_snapshot.h" #include "access/xlogdefs.h" #include "lib/ilist.h" +#include "utils/snapshot.h" #include "storage/latch.h" #include "storage/lock.h" #include "storage/pg_sema.h" @@ -57,6 +59,7 @@ struct XidCache #define PROC_IN_LOGICAL_DECODING 0x10 /* currently doing logical * decoding outside xact */ #define PROC_RESERVED 0x20 /* reserved for procarray */ +#define PROC_RESERVED2 0x40 /* reserved for procarray */ /* flags reset at EOXact */ #define PROC_VACUUM_STATE_MASK \ @@ -203,6 +206,18 @@ struct PGPROC PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */ dlist_head lockGroupMembers; /* list of members, if I'm a leader */ dlist_node lockGroupLink; /* my member link, if I'm a member */ + + /* + * assignedGlobalCsn holds GlobalCSN for this transaction. It is generated + * under a ProcArray lock and later is writter to a GlobalCSNLog. This + * variable defined as atomic only for case of group commit, in all other + * scenarios only backend responsible for this proc entry is working with + * this variable. + */ + GlobalCSN_atomic assignedGlobalCsn; + + /* Original xmin of this backend before global snapshot was imported */ + TransactionId originalXmin; }; /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index a5c7d0c064..452ae5d547 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -36,6 +36,10 @@ #define PROCARRAY_SLOTS_XMIN 0x20 /* replication slot xmin, * catalog_xmin */ + +#define PROCARRAY_NON_IMPORTED_XMIN 0x40 /* use originalXmin instead + * of xmin to properly + * maintain gsXidMap */ /* * Only flags in PROCARRAY_PROC_FLAGS_MASK are considered when matching * PGXACT->vacuumFlags. Other flags are used for different purposes and @@ -125,4 +129,8 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin); +extern void ProcArraySetGlobalSnapshotXmin(TransactionId xmin); + +extern TransactionId ProcArrayGetGlobalSnapshotXmin(void); + #endif /* PROCARRAY_H */ diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index b28d13ce84..f4768bc6d4 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -127,6 +127,9 @@ extern void AtSubCommit_Snapshot(int level); extern void AtSubAbort_Snapshot(int level); extern void AtEOXact_Snapshot(bool isCommit, bool resetXmin); +extern GlobalCSN ExportGlobalSnapshot(void); +extern void ImportGlobalSnapshot(GlobalCSN snap_global_csn); + extern void ImportSnapshot(const char *idstr); extern bool XactHasExportedSnapshots(void); extern void DeleteAllExportedSnapshotFiles(void); diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h index 57d2dfaa67..71c92c69f4 100644 --- a/src/include/utils/snapshot.h +++ b/src/include/utils/snapshot.h @@ -204,6 +204,14 @@ typedef struct SnapshotData TimestampTz whenTaken; /* timestamp when snapshot was taken */ XLogRecPtr lsn; /* position in the WAL stream when taken */ + + /* + * GlobalCSN for cross-node snapshot isolation support. + * Will be used only if track_global_snapshots is enabled. + */ + GlobalCSN global_csn; + /* Did we have our own global_csn or imported one from different node */ + bool imported_global_csn; } SnapshotData; #endif /* SNAPSHOT_H */ -- 2.25.1