diff --git a/src/backend/access/transam/csn_snapshot.c b/src/backend/access/transam/csn_snapshot.c index cedce60a6f..19106cd93a 100644 --- a/src/backend/access/transam/csn_snapshot.c +++ b/src/backend/access/transam/csn_snapshot.c @@ -50,10 +50,62 @@ typedef struct static CSNSnapshotState *csnState; + /* - * Enables this module. + * GUC to delay advance of oldestXid for this amount of time. Also determines + * the size CSNSnapshotXidMap circular buffer. */ -extern bool enable_csn_snapshot; +int csn_snapshot_defer_time; + + +/* + * CSNSnapshotXidMap + * + * To be able to install csn snapshot that points to past we need to keep + * old versions of tuples and therefore delay advance of oldestXid. Here we + * keep track of correspondence between snapshot's snapshot_csn and oldestXid + * that was set at the time when the snapshot was taken. Much like the + * snapshot too old's OldSnapshotControlData does, but with finer granularity + * to seconds. + * + * Different strategies can be employed to hold oldestXid (e.g. we can track + * oldest csn-based snapshot among cluster nodes and map it oldestXid + * on each node). + * + * On each snapshot acquisition CSNSnapshotMapXmin() is called and stores + * correspondence between current snapshot_csn and oldestXmin in a sparse way: + * snapshot_csn is rounded to seconds (and here we use the fact that snapshot_csn + * is just a timestamp) and oldestXmin is stored in the circular buffer where + * rounded snapshot_csn acts as an offset from current circular buffer head. + * Size of the circular buffer is controlled by csn_snapshot_defer_time GUC. + * + * When csn snapshot arrives we check that its + * snapshot_csn is still in our map, otherwise we'll error out with "snapshot too + * old" message. If snapshot_csn is successfully mapped to oldestXid we move + * backend's pgxact->xmin to proc->originalXmin and fill pgxact->xmin to + * mapped oldestXid. That way GetOldestXmin() can take into account backends + * with imported csn snapshot and old tuple versions will be preserved. + * + * Also while calculating oldestXmin for our map in presence of imported + * csn snapshots we should use proc->originalXmin instead of pgxact->xmin + * that was set during import. Otherwise, we can create a feedback loop: + * xmin's of imported csn snapshots were calculated using our map and new + * entries in map going to be calculated based on that xmin's, and there is + * a risk to stuck forever with one non-increasing oldestXmin. All other + * callers of GetOldestXmin() are using pgxact->xmin so the old tuple versions + * are preserved. + */ +typedef struct CSNSnapshotXidMap +{ + int head; /* offset of current freshest value */ + int size; /* total size of circular buffer */ + CSN_atomic last_csn_seconds; /* last rounded csn that changed + * xmin_by_second[] */ + TransactionId *xmin_by_second; /* circular buffer of oldestXmin's */ +} +CSNSnapshotXidMap; + +static CSNSnapshotXidMap *csnXidMap; /* Estimate shared memory space needed */ @@ -64,25 +116,249 @@ CSNSnapshotShmemSize(void) size += MAXALIGN(sizeof(CSNSnapshotState)); + if (csn_snapshot_defer_time > 0) + { + size += sizeof(CSNSnapshotXidMap); + size += csn_snapshot_defer_time*sizeof(TransactionId); + size = MAXALIGN(size); + } + return size; } /* Init shared memory structures */ void -CSNSnapshotShmemInit() +CSNSnapshotShmemInit(void) { bool found; - csnState = ShmemInitStruct("csnState", - sizeof(CSNSnapshotState), - &found); - if (!found) + if (true) + { + csnState = ShmemInitStruct("csnState", + sizeof(CSNSnapshotState), + &found); + if (!found) + { + csnState->last_max_csn = 0; + csnState->last_csn_log_wal = 0; + csnState->xmin_for_csn = InvalidTransactionId; + SpinLockInit(&csnState->lock); + } + } + + if (csn_snapshot_defer_time > 0) + { + csnXidMap = ShmemInitStruct("csnXidMap", + sizeof(CSNSnapshotXidMap), + &found); + if (!found) + { + int i; + + pg_atomic_init_u64(&csnXidMap->last_csn_seconds, 0); + csnXidMap->head = 0; + csnXidMap->size = csn_snapshot_defer_time; + csnXidMap->xmin_by_second = + ShmemAlloc(sizeof(TransactionId)*csnXidMap->size); + + for (i = 0; i < csnXidMap->size; i++) + csnXidMap->xmin_by_second[i] = InvalidTransactionId; + } + } +} + +/* + * CSNSnapshotStartup + * + * Set csnXidMap entries to oldestActiveXID during startup. + */ +void +CSNSnapshotStartup(TransactionId oldestActiveXID) +{ + /* + * Run only if we have initialized shared memory and csnXidMap + * is enabled. + */ + if (IsNormalProcessingMode() && + enable_csn_snapshot && csn_snapshot_defer_time > 0) + { + int i; + + Assert(TransactionIdIsValid(oldestActiveXID)); + for (i = 0; i < csnXidMap->size; i++) + csnXidMap->xmin_by_second[i] = oldestActiveXID; + ProcArraySetCSNSnapshotXmin(oldestActiveXID); + } +} + +/* + * CSNSnapshotMapXmin + * + * Maintain circular buffer of oldestXmins for several seconds in past. This + * buffer allows to shift oldestXmin in the past when backend is importing + * CSN snapshot. Otherwise old versions of tuples that were needed for + * this transaction can be recycled by other processes (vacuum, HOT, etc). + * + * Locking here is not trivial. Called upon each snapshot creation after + * ProcArrayLock is released. Such usage creates several race conditions. It + * is possible that backend who got csn called CSNSnapshotMapXmin() + * only after other backends managed to get snapshot and complete + * CSNSnapshotMapXmin() call, or even committed. This is safe because + * + * * We already hold our xmin in MyPgXact, so our snapshot will not be + * harmed even though ProcArrayLock is released. + * + * * snapshot_csn is always pessmistically rounded up to the next + * second. + * + * * For performance reasons, xmin value for particular second is filled + * only once. Because of that instead of writing to buffer just our + * xmin (which is enough for our snapshot), we bump oldestXmin there -- + * it mitigates the possibility of damaging someone else's snapshot by + * writing to the buffer too advanced value in case of slowness of + * another backend who generated csn earlier, but didn't manage to + * insert it before us. + * + * * if CSNSnapshotMapXmin() founds a gap in several seconds between + * current call and latest completed call then it should fill that gap + * with latest known values instead of new one. Otherwise it is + * possible (however highly unlikely) that this gap also happend + * between taking snapshot and call to CSNSnapshotMapXmin() for some + * backend. And we are at risk to fill circullar buffer with + * oldestXmin's that are bigger then they actually were. + */ +void +CSNSnapshotMapXmin(SnapshotCSN snapshot_csn) +{ + int offset, gap, i; + SnapshotCSN csn_seconds; + SnapshotCSN last_csn_seconds; + volatile TransactionId oldest_deferred_xmin; + TransactionId current_oldest_xmin, previous_oldest_xmin; + + /* Callers should check config values */ + Assert(csn_snapshot_defer_time > 0); + Assert(csnXidMap != NULL); + /* + * Round up snapshot_csn to the next second -- pessimistically and safely. + */ + csn_seconds = (snapshot_csn / NSECS_PER_SEC + 1); + + /* + * Fast-path check. Avoid taking exclusive CSNSnapshotXidMapLock lock + * if oldestXid was already written to xmin_by_second[] for this rounded + * snapshot_csn. + */ + if (pg_atomic_read_u64(&csnXidMap->last_csn_seconds) >= csn_seconds) + return; + + /* Ok, we have new entry (or entries) */ + LWLockAcquire(CSNSnapshotXidMapLock, LW_EXCLUSIVE); + + /* Re-check last_csn_seconds under lock */ + last_csn_seconds = pg_atomic_read_u64(&csnXidMap->last_csn_seconds); + if (last_csn_seconds >= csn_seconds) + { + LWLockRelease(CSNSnapshotXidMapLock); + return; + } + pg_atomic_write_u64(&csnXidMap->last_csn_seconds, csn_seconds); + + /* + * Count oldest_xmin. + * + * It was possible to calculate oldest_xmin during corresponding snapshot + * creation, but GetSnapshotData() intentionally reads only PgXact, but not + * PgProc. And we need info about originalXmin (see comment to csnXidMap) + * which is stored in PgProc because of threats in comments around PgXact + * about extending it with new fields. So just calculate oldest_xmin again, + * that anyway happens quite rarely. + */ + current_oldest_xmin = GetOldestXmin(NULL, PROCARRAY_NON_IMPORTED_XMIN); + + previous_oldest_xmin = csnXidMap->xmin_by_second[csnXidMap->head]; + + Assert(TransactionIdIsNormal(current_oldest_xmin)); + Assert(TransactionIdIsNormal(previous_oldest_xmin) || !enable_csn_snapshot); + + gap = csn_seconds - last_csn_seconds; + offset = csn_seconds % csnXidMap->size; + + /* Sanity check before we update head and gap */ + Assert( gap >= 1 ); + Assert( (csnXidMap->head + gap) % csnXidMap->size == offset ); + + gap = gap > csnXidMap->size ? csnXidMap->size : gap; + csnXidMap->head = offset; + + /* Fill new entry with current_oldest_xmin */ + csnXidMap->xmin_by_second[offset] = current_oldest_xmin; + + /* + * If we have gap then fill it with previous_oldest_xmin for reasons + * outlined in comment above this function. + */ + for (i = 1; i < gap; i++) + { + offset = (offset + csnXidMap->size - 1) % csnXidMap->size; + csnXidMap->xmin_by_second[offset] = previous_oldest_xmin; + } + + oldest_deferred_xmin = + csnXidMap->xmin_by_second[ (csnXidMap->head + 1) % csnXidMap->size ]; + + LWLockRelease(CSNSnapshotXidMapLock); + + /* + * Advance procArray->csn_snapshot_xmin after we released + * CSNSnapshotXidMapLock. Since we gather not xmin but oldestXmin, it + * never goes backwards regardless of how slow we can do that. + */ + Assert(TransactionIdFollowsOrEquals(oldest_deferred_xmin, + ProcArrayGetCSNSnapshotXmin())); + ProcArraySetCSNSnapshotXmin(oldest_deferred_xmin); +} + +/* + * CSNSnapshotToXmin + * + * Get oldestXmin that took place when snapshot_csn was taken. + */ +TransactionId +CSNSnapshotToXmin(SnapshotCSN snapshot_csn) +{ + TransactionId xmin; + SnapshotCSN csn_seconds; + volatile SnapshotCSN last_csn_seconds; + + /* Callers should check config values */ + Assert(csn_snapshot_defer_time > 0); + Assert(csnXidMap != NULL); + + /* Round down to get conservative estimates */ + csn_seconds = (snapshot_csn / NSECS_PER_SEC); + + LWLockAcquire(CSNSnapshotXidMapLock, LW_SHARED); + last_csn_seconds = pg_atomic_read_u64(&csnXidMap->last_csn_seconds); + if (csn_seconds > last_csn_seconds) + { + /* we don't have entry for this snapshot_csn yet, return latest known */ + xmin = csnXidMap->xmin_by_second[csnXidMap->head]; + } + else if (last_csn_seconds - csn_seconds < csnXidMap->size) { - csnState->last_max_csn = 0; - csnState->last_csn_log_wal = 0; - csnState->xmin_for_csn = InvalidTransactionId; - SpinLockInit(&csnState->lock); + /* we are good, retrieve value from our map */ + Assert(last_csn_seconds % csnXidMap->size == csnXidMap->head); + xmin = csnXidMap->xmin_by_second[csn_seconds % csnXidMap->size]; } + else + { + /* requested snapshot_csn is too old, let caller know */ + xmin = InvalidTransactionId; + } + LWLockRelease(CSNSnapshotXidMapLock); + + return xmin; } /* @@ -99,7 +375,7 @@ GenerateCSN(bool locked) instr_time current_time; SnapshotCSN csn; - Assert(get_csnlog_status()); + Assert(get_csnlog_status() || csn_snapshot_defer_time > 0); /* * TODO: create some macro that add small random shift to current time. @@ -124,6 +400,125 @@ GenerateCSN(bool locked) return csn; } +/* + * CSNSnapshotPrepareCurrent + * + * Set InDoubt state for currently active transaction and return commit's + * global snapshot. + */ +SnapshotCSN +CSNSnapshotPrepareCurrent(void) +{ + TransactionId xid = GetCurrentTransactionIdIfAny(); + + if (!enable_csn_snapshot) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "enable_csn_snapshot"))); + + if (TransactionIdIsValid(xid)) + { + TransactionId *subxids; + int nsubxids = xactGetCommittedChildren(&subxids); + CSNLogSetCSN(xid, nsubxids, subxids, InDoubtXidCSN, true); + } + + /* Nothing to write if we don't heve xid */ + + return GenerateCSN(false); +} + + +/* + * CSNSnapshotAssignCsnCurrent + * + * Asign SnapshotCSN for currently active transaction. SnapshotCSN is supposedly + * maximal among of values returned by CSNSnapshotPrepareCurrent and + * pg_global_snapshot_prepare. + */ +void +CSNSnapshotAssignCsnCurrent(SnapshotCSN snapshot_csn) +{ + if (!enable_csn_snapshot) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "enable_csn_snapshot"))); + + if (!XidCSNIsNormal(snapshot_csn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pg_global_snapshot_assign expects normal snapshot_csn"))); + + /* Skip emtpty transactions */ + if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) + return; + + /* Set global_csn and defuse ProcArrayEndTransaction from assigning one */ + pg_atomic_write_u64(&MyProc->assignedXidCsn, snapshot_csn); +} + +/* + * CSNSnapshotSync + * + * Due to time desynchronization on different nodes we can receive snapshot_csn + * which is greater than snapshot_csn on this node. To preserve proper isolation + * this node needs to wait when such snapshot_csn comes on local clock. + * + * This should happend relatively rare if nodes have running NTP/PTP/etc. + * Complain if wait time is more than SNAP_SYNC_COMPLAIN. + */ +void +CSNSnapshotSync(SnapshotCSN remote_csn) +{ + SnapshotCSN local_csn; + SnapshotCSN delta; + + Assert(enable_csn_snapshot); + + for(;;) + { + SpinLockAcquire(&csnState->lock); + if (csnState->last_max_csn > remote_csn) + { + /* Everything is fine */ + SpinLockRelease(&csnState->lock); + return; + } + else if ((local_csn = GenerateCSN(true)) >= remote_csn) + { + /* + * Everything is fine too, but last_max_csn wasn't updated for + * some time. + */ + SpinLockRelease(&csnState->lock); + return; + } + SpinLockRelease(&csnState->lock); + + /* Okay we need to sleep now */ + delta = remote_csn - local_csn; + if (delta > SNAP_DESYNC_COMPLAIN) + ereport(WARNING, + (errmsg("remote global snapshot exceeds ours by more than a second"), + errhint("Consider running NTPd on servers participating in global transaction"))); + + /* TODO: report this sleeptime somewhere? */ + pg_usleep((long) (delta/NSECS_PER_USEC)); + + /* + * Loop that checks to ensure that we actually slept for specified + * amount of time. + */ + } + + Assert(false); /* Should not happend */ + return; +} + /* * TransactionIdGetXidCSN * diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 57bda5d422..7f90520beb 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2469,3 +2469,128 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) RemoveTwoPhaseFile(xid, giveWarning); RemoveGXact(gxact); } + +/* + * CSNSnapshotPrepareTwophase + * + * Set InDoubt state for currently active transaction and return commit's + * global snapshot. + */ +static SnapshotCSN +CSNSnapshotPrepareTwophase(const char *gid) +{ + GlobalTransaction gxact; + PGXACT *pgxact; + char *buf; + TransactionId xid; + xl_xact_parsed_prepare parsed; + + if (!enable_csn_snapshot) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "enable_csn_snapshot"))); + + /* + * Validate the GID, and lock the GXACT to ensure that two backends do not + * try to access the same GID at once. + */ + gxact = LockGXact(gid, GetUserId()); + pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + xid = pgxact->xid; + + if (gxact->ondisk) + buf = ReadTwoPhaseFile(xid, true); + else + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL); + + ParsePrepareRecord(0, (xl_xact_prepare *)buf, &parsed); + + CSNLogSetCSN(xid, parsed.nsubxacts, + parsed.subxacts, InDoubtXidCSN, true); + + /* Unlock our GXACT */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + gxact->locking_backend = InvalidBackendId; + LWLockRelease(TwoPhaseStateLock); + + pfree(buf); + + return GenerateCSN(false); +} + +/* + * TwoPhaseAssignGlobalCsn + * + * Asign SnapshotCSN for currently active transaction. SnapshotCSN is supposedly + * maximal among of values returned by CSNSnapshotPrepareCurrent and + * pg_global_snapshot_prepare. + * + * This function is a counterpart of GlobalSnapshotAssignCsnCurrent() for + * twophase transactions. + */ +static void +CSNSnapshotAssignCsnTwoPhase(const char *gid, SnapshotCSN snapshot_csn) +{ + GlobalTransaction gxact; + PGPROC *proc; + + if (!enable_csn_snapshot) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "enable_csn_snapshot"))); + + if (!XidCSNIsNormal(snapshot_csn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pg_csn_snapshot_assign expects normal snapshot_csn"))); + + /* + * Validate the GID, and lock the GXACT to ensure that two backends do not + * try to access the same GID at once. + */ + gxact = LockGXact(gid, GetUserId()); + proc = &ProcGlobal->allProcs[gxact->pgprocno]; + + /* Set snapshot_csn and defuse ProcArrayRemove from assigning one. */ + pg_atomic_write_u64(&proc->assignedXidCsn, snapshot_csn); + + /* Unlock our GXACT */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + gxact->locking_backend = InvalidBackendId; + LWLockRelease(TwoPhaseStateLock); +} + +/* + * SQL interface to CSNSnapshotPrepareTwophase() + * + * TODO: Rewrite this as PREPARE TRANSACTION 'gid' RETURNING SNAPSHOT + */ +Datum +pg_csn_snapshot_prepare(PG_FUNCTION_ARGS) +{ + const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0)); + SnapshotCSN snapshot_csn; + + snapshot_csn = CSNSnapshotPrepareTwophase(gid); + + PG_RETURN_INT64(snapshot_csn); +} + +/* + * SQL interface to CSNSnapshotAssignCsnTwoPhase() + * + * TODO: Rewrite this as COMMIT PREPARED 'gid' SNAPSHOT 'global_csn' + */ +Datum +pg_csn_snapshot_assign(PG_FUNCTION_ARGS) +{ + const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0)); + SnapshotCSN snapshot_csn = PG_GETARG_INT64(1); + + CSNSnapshotAssignCsnTwoPhase(gid, snapshot_csn); + PG_RETURN_VOID(); +} diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 32f1e614b4..0a1c7d8615 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7071,6 +7071,7 @@ StartupXLOG(void) */ StartupCLOG(); StartupSUBTRANS(oldestActiveXID); + CSNSnapshotStartup(oldestActiveXID); /* * If we're beginning at a shutdown checkpoint, we know that @@ -7888,6 +7889,7 @@ StartupXLOG(void) { StartupCLOG(); StartupSUBTRANS(oldestActiveXID); + CSNSnapshotStartup(oldestActiveXID); } /* diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index e326b431c2..b36a85cd01 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -96,6 +96,8 @@ typedef struct ProcArrayStruct TransactionId replication_slot_xmin; /* oldest catalog xmin of any replication slot */ TransactionId replication_slot_catalog_xmin; + /* xmin of oldest active csn snapshot */ + TransactionId csn_snapshot_xmin; /* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */ int pgprocnos[FLEXIBLE_ARRAY_MEMBER]; @@ -251,6 +253,7 @@ CreateSharedProcArray(void) procArray->lastOverflowedXid = InvalidTransactionId; procArray->replication_slot_xmin = InvalidTransactionId; procArray->replication_slot_catalog_xmin = InvalidTransactionId; + procArray->csn_snapshot_xmin = InvalidTransactionId; } allProcs = ProcGlobal->allProcs; @@ -442,6 +445,8 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid) proc->lxid = InvalidLocalTransactionId; pgxact->xmin = InvalidTransactionId; + proc->originalXmin = InvalidTransactionId; + /* must be cleared with xid/xmin: */ pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; proc->delayChkpt = false; /* be sure this is cleared in abort */ @@ -464,6 +469,8 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact, pgxact->xid = InvalidTransactionId; proc->lxid = InvalidLocalTransactionId; pgxact->xmin = InvalidTransactionId; + proc->originalXmin = InvalidTransactionId; + /* must be cleared with xid/xmin: */ pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; proc->delayChkpt = false; /* be sure this is cleared in abort */ @@ -630,6 +637,7 @@ ProcArrayClearTransaction(PGPROC *proc) pgxact->xid = InvalidTransactionId; proc->lxid = InvalidLocalTransactionId; pgxact->xmin = InvalidTransactionId; + proc->originalXmin = InvalidTransactionId; proc->recoveryConflictPending = false; /* redundant, but just in case */ @@ -1332,6 +1340,7 @@ GetOldestXmin(Relation rel, int flags) TransactionId replication_slot_xmin = InvalidTransactionId; TransactionId replication_slot_catalog_xmin = InvalidTransactionId; + TransactionId csn_snapshot_xmin = InvalidTransactionId; /* * If we're not computing a relation specific limit, or if a shared @@ -1370,6 +1379,7 @@ GetOldestXmin(Relation rel, int flags) { /* Fetch xid just once - see GetNewTransactionId */ TransactionId xid = UINT32_ACCESS_ONCE(pgxact->xid); + TransactionId original_xmin = UINT32_ACCESS_ONCE(proc->originalXmin); /* First consider the transaction's own Xid, if any */ if (TransactionIdIsNormal(xid) && @@ -1382,8 +1392,17 @@ GetOldestXmin(Relation rel, int flags) * We must check both Xid and Xmin because a transaction might * have an Xmin but not (yet) an Xid; conversely, if it has an * Xid, that could determine some not-yet-set Xmin. + * + * In case of oldestXmin calculation for CSNSnapshotMapXmin() + * pgxact->xmin should be changed to proc->originalXmin. Details + * in commets to CSNSnapshotMapXmin. */ - xid = UINT32_ACCESS_ONCE(pgxact->xmin); + if ((flags & PROCARRAY_NON_IMPORTED_XMIN) && + TransactionIdIsValid(original_xmin)) + xid = original_xmin; + else + xid = UINT32_ACCESS_ONCE(pgxact->xmin); + if (TransactionIdIsNormal(xid) && TransactionIdPrecedes(xid, result)) result = xid; @@ -1397,6 +1416,7 @@ GetOldestXmin(Relation rel, int flags) */ replication_slot_xmin = procArray->replication_slot_xmin; replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; + csn_snapshot_xmin = procArray->csn_snapshot_xmin; if (RecoveryInProgress()) { @@ -1438,6 +1458,11 @@ GetOldestXmin(Relation rel, int flags) result = FirstNormalTransactionId; } + if (!(flags & PROCARRAY_NON_IMPORTED_XMIN) && + TransactionIdIsValid(csn_snapshot_xmin) && + NormalTransactionIdPrecedes(csn_snapshot_xmin, result)) + result = csn_snapshot_xmin; + /* * Check whether there are replication slots requiring an older xmin. */ @@ -1535,6 +1560,7 @@ GetSnapshotData(Snapshot snapshot) XidCSN xid_csn = FrozenXidCSN; TransactionId replication_slot_xmin = InvalidTransactionId; TransactionId replication_slot_catalog_xmin = InvalidTransactionId; + TransactionId csn_snapshot_xmin = InvalidTransactionId; Assert(snapshot != NULL); @@ -1726,6 +1752,7 @@ GetSnapshotData(Snapshot snapshot) */ replication_slot_xmin = procArray->replication_slot_xmin; replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; + csn_snapshot_xmin = procArray->csn_snapshot_xmin; if (!TransactionIdIsValid(MyPgXact->xmin)) MyPgXact->xmin = TransactionXmin = xmin; @@ -1752,6 +1779,10 @@ GetSnapshotData(Snapshot snapshot) if (!TransactionIdIsNormal(RecentGlobalXmin)) RecentGlobalXmin = FirstNormalTransactionId; + if (TransactionIdIsValid(csn_snapshot_xmin) && + TransactionIdPrecedes(csn_snapshot_xmin, RecentGlobalXmin)) + RecentGlobalXmin = csn_snapshot_xmin; + /* Check whether there's a replication slot requiring an older xmin. */ if (TransactionIdIsValid(replication_slot_xmin) && NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin)) @@ -1807,7 +1838,10 @@ GetSnapshotData(Snapshot snapshot) MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin); } + snapshot->imported_snapshot_csn = false; snapshot->snapshot_csn = xid_csn; + if (csn_snapshot_defer_time > 0 && IsUnderPostmaster) + CSNSnapshotMapXmin(snapshot->snapshot_csn); return snapshot; } @@ -3156,6 +3190,24 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin, LWLockRelease(ProcArrayLock); } +/* + * ProcArraySetCSNSnapshotXmin + */ +void +ProcArraySetCSNSnapshotXmin(TransactionId xmin) +{ + /* We rely on atomic fetch/store of xid */ + procArray->csn_snapshot_xmin = xmin; +} + +/* + * ProcArrayGetCSNSnapshotXmin + */ +TransactionId +ProcArrayGetCSNSnapshotXmin(void) +{ + return procArray->csn_snapshot_xmin; +} #define XidCacheRemove(i) \ do { \ diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index 3c95ce4aac..e048a2276d 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -51,3 +51,4 @@ OldSnapshotTimeMapLock 42 LogicalRepWorkerLock 43 XactTruncationLock 44 CSNLogControlLock 45 +CSNSnapshotXidMapLock 46 diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 2a31366930..2bfafa69c1 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -2394,3 +2394,98 @@ XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot) return false; } + + +/* + * ExportCSNSnapshot + * + * Export snapshot_csn so that caller can expand this transaction to other + * nodes. + * + * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and + * add some additional checks that transaction did not yet acquired xid, but + * for current iteration of this patch I don't want to hack on parser. + */ +SnapshotCSN +ExportCSNSnapshot() +{ + if (!get_csnlog_status()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not export csn snapshot"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "enable_csn_snapshot"))); + + return CurrentSnapshot->snapshot_csn; +} + +/* SQL accessor to ExportCSNSnapshot() */ +Datum +pg_csn_snapshot_export(PG_FUNCTION_ARGS) +{ + SnapshotCSN export_csn = ExportCSNSnapshot(); + PG_RETURN_UINT64(export_csn); +} + +/* + * ImportCSNSnapshot + * + * Import csn and retract this backends xmin to the value that was + * actual when we had such csn. + * + * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and + * add some additional checks that transaction did not yet acquired xid, but + * for current iteration of this patch I don't want to hack on parser. + */ +void +ImportCSNSnapshot(SnapshotCSN snapshot_csn) +{ + volatile TransactionId xmin; + + if (!get_csnlog_status()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not import csn snapshot"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "enable_csn_snapshot"))); + + if (csn_snapshot_defer_time <= 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not import csn snapshot"), + errhint("Make sure the configuration parameter \"%s\" is positive.", + "csn_snapshot_defer_time"))); + + /* + * Call CSNSnapshotToXmin under ProcArrayLock to avoid situation that + * resulting xmin will be evicted from map before we will set it into our + * backend's xmin. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + xmin = CSNSnapshotToXmin(snapshot_csn); + if (!TransactionIdIsValid(xmin)) + { + LWLockRelease(ProcArrayLock); + elog(ERROR, "CSNSnapshotToXmin: csn snapshot too old"); + } + MyProc->originalXmin = MyPgXact->xmin; + MyPgXact->xmin = TransactionXmin = xmin; + LWLockRelease(ProcArrayLock); + + CurrentSnapshot->xmin = xmin; /* defuse SnapshotResetXmin() */ + CurrentSnapshot->snapshot_csn = snapshot_csn; + CurrentSnapshot->imported_snapshot_csn = true; + CSNSnapshotSync(snapshot_csn); + + Assert(TransactionIdPrecedesOrEquals(RecentGlobalXmin, xmin)); + Assert(TransactionIdPrecedesOrEquals(RecentGlobalDataXmin, xmin)); +} + +/* SQL accessor to ImportCSNSnapshot() */ +Datum +pg_csn_snapshot_import(PG_FUNCTION_ARGS) +{ + SnapshotCSN snapshot_csn = PG_GETARG_UINT64(0); + ImportCSNSnapshot(snapshot_csn); + PG_RETURN_VOID(); +} \ No newline at end of file diff --git a/src/include/access/csn_snapshot.h b/src/include/access/csn_snapshot.h index a768f054f5..92bc9c77bf 100644 --- a/src/include/access/csn_snapshot.h +++ b/src/include/access/csn_snapshot.h @@ -38,11 +38,13 @@ typedef pg_atomic_uint64 CSN_atomic; #define CSNIsUnclear(csn) ((csn) == UnclearCSN) #define XidCSNIsNormal(csn) ((csn) >= FirstNormalXidCSN) - - +extern int csn_snapshot_defer_time; extern Size CSNSnapshotShmemSize(void); extern void CSNSnapshotShmemInit(void); +extern void CSNSnapshotStartup(TransactionId oldestActiveXID); +extern void CSNSnapshotMapXmin(SnapshotCSN snapshot_csn); +extern TransactionId CSNSnapshotToXmin(SnapshotCSN snapshot_csn); extern SnapshotCSN GenerateCSN(bool locked); @@ -56,5 +58,8 @@ extern void CSNSnapshotPrecommit(PGPROC *proc, TransactionId xid, int nsubxids, TransactionId *subxids); extern void CSNSnapshotCommit(PGPROC *proc, TransactionId xid, int nsubxids, TransactionId *subxids); +extern void CSNSnapshotAssignCsnCurrent(SnapshotCSN snapshot_csn); +extern SnapshotCSN CSNSnapshotPrepareCurrent(void); +extern void CSNSnapshotSync(SnapshotCSN remote_csn); #endif /* CSN_SNAPSHOT_H */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 95604e988a..17e85486ae 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10953,4 +10953,18 @@ proname => 'is_normalized', prorettype => 'bool', proargtypes => 'text text', prosrc => 'unicode_is_normalized' }, +# csn shnapshot handling +{ oid => '4179', descr => 'export csn snapshot', + proname => 'pg_csn_snapshot_export', provolatile => 'v', proparallel => 'u', + prorettype => 'int8', proargtypes => '', prosrc => 'pg_csn_snapshot_export' }, +{ oid => '4180', descr => 'import csn snapshot', + proname => 'pg_csn_snapshot_import', provolatile => 'v', proparallel => 'u', + prorettype => 'void', proargtypes => 'int8', prosrc => 'pg_csn_snapshot_import' }, +{ oid => '4198', descr => 'prepare distributed transaction for commit, get global_csn', + proname => 'pg_csn_snapshot_prepare', provolatile => 'v', proparallel => 'u', + prorettype => 'int8', proargtypes => 'text', prosrc => 'pg_csn_snapshot_prepare' }, +{ oid => '4199', descr => 'assign global_csn to distributed transaction', + proname => 'pg_csn_snapshot_assign', provolatile => 'v', proparallel => 'u', + prorettype => 'void', proargtypes => 'text int8', prosrc => 'pg_csn_snapshot_assign' }, + ] diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 3ff7ea4fce..30bcbbfe15 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -222,6 +222,8 @@ struct PGPROC */ CSN_atomic assignedXidCsn; + /* Original xmin of this backend before csn snapshot was imported */ + TransactionId originalXmin; }; /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index a5c7d0c064..35dc1dcc40 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -36,6 +36,9 @@ #define PROCARRAY_SLOTS_XMIN 0x20 /* replication slot xmin, * catalog_xmin */ +#define PROCARRAY_NON_IMPORTED_XMIN 0x80 /* use originalXmin instead + * of xmin to properly + * maintain csnXidMap */ /* * Only flags in PROCARRAY_PROC_FLAGS_MASK are considered when matching * PGXACT->vacuumFlags. Other flags are used for different purposes and @@ -125,4 +128,6 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin); +extern void ProcArraySetCSNSnapshotXmin(TransactionId xmin); +extern TransactionId ProcArrayGetCSNSnapshotXmin(void); #endif /* PROCARRAY_H */ diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index ffb4ba3adf..0e37ebad07 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -127,6 +127,8 @@ extern void AtSubCommit_Snapshot(int level); extern void AtSubAbort_Snapshot(int level); extern void AtEOXact_Snapshot(bool isCommit, bool resetXmin); +extern SnapshotCSN ExportCSNSnapshot(void); +extern void ImportCSNSnapshot(SnapshotCSN snapshot_csn); extern void ImportSnapshot(const char *idstr); extern bool XactHasExportedSnapshots(void); extern void DeleteAllExportedSnapshotFiles(void); diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h index 9f622c76a7..2eef33c4b6 100644 --- a/src/include/utils/snapshot.h +++ b/src/include/utils/snapshot.h @@ -210,6 +210,8 @@ typedef struct SnapshotData * Will be used only if enable_csn_snapshot is enabled. */ SnapshotCSN snapshot_csn; + /* Did we have our own snapshot_csn or imported one from different node */ + bool imported_snapshot_csn; } SnapshotData; #endif /* SNAPSHOT_H */