From 12b1228e8b6cf3040ba14a34253fcbb35fedff8f Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Wed, 22 Feb 2017 00:57:33 +0100 Subject: [PATCH 4/5] Fix xl_running_xacts usage in snapshot builder Due to race condition, the xl_running_xacts might contain no longer running transactions. Previous coding tried to get around this by additional locking but that did not work correctly for committs. Instead try combining decoded commits and multiple xl_running_xacts to get the consistent snapshot. This also reverts changes made to GetRunningTransactionData() and LogStandbySnapshot() by b89e151 as the additional locking does not help. --- src/backend/replication/logical/snapbuild.c | 71 ++++++++++++++++++++++++----- src/backend/storage/ipc/procarray.c | 5 +- src/backend/storage/ipc/standby.c | 19 -------- 3 files changed, 63 insertions(+), 32 deletions(-) diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 1e8346d..f683c24 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1221,7 +1221,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * simply track the number of in-progress toplevel transactions and * lower it whenever one commits or aborts. When that number * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT - * to CONSISTENT. + * to CONSISTENT. Sometimes we might get xl_running_xacts which has + * all tracked transactions as finished. We'll need to restart tracking + * in that case and use previously collected committed transactions to + * purge transactions mistakenly marked as running in the + * xl_running_xacts which exist as a result of race condition in + * LogStandbySnapshot(). * NB: We need to search running.xip when seeing a transaction's end to * make sure it's a toplevel transaction and it's been one of the * initially running ones. @@ -1286,11 +1291,17 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * b) first encounter of a useable xl_running_xacts record. If we had * found one earlier we would either track running transactions (i.e. * builder->running.xcnt != 0) or be consistent (this function wouldn't - * get called). + * get called). However it's possible that we could not see all + * transactions that were marked as running in xl_running_xacts, so if + * we get new one that says all were closed but we are not consistent + * yet, we need to restart the tracking while taking previously seen + * transactions into account. */ - else if (!builder->running.xcnt) + else if (!builder->running.xcnt || + running->oldestRunningXid > builder->running.xmax) { int off; + bool first = builder->running.xcnt == 0; /* * We only care about toplevel xids as those are the ones we @@ -1326,20 +1337,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn builder->running.xmin = builder->running.xip[0]; builder->running.xmax = builder->running.xip[running->xcnt - 1]; + /* makes comparisons cheaper later */ TransactionIdRetreat(builder->running.xmin); TransactionIdAdvance(builder->running.xmax); builder->state = SNAPBUILD_FULL_SNAPSHOT; - ereport(LOG, - (errmsg("logical decoding found initial starting point at %X/%X", - (uint32) (lsn >> 32), (uint32) lsn), - errdetail_plural("%u transaction needs to finish.", - "%u transactions need to finish.", - builder->running.xcnt, - (uint32) builder->running.xcnt))); - /* * Iterate through all xids, wait for them to finish. * @@ -1359,9 +1363,54 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn if (TransactionIdIsCurrentTransactionId(xid)) elog(ERROR, "waiting for ourselves"); + /* + * This isn't required for the correctness of decoding, but to allow + * isolationtester to notice that we're currently waiting for + * something. + */ XactLockTableWait(xid, NULL, NULL, XLTW_None); } + /* + * If this is the first time we've seen xl_running_xacts, we are done. + */ + if (first) + { + ereport(LOG, + (errmsg("logical decoding found initial starting point at %X/%X", + (uint32) (lsn >> 32), (uint32) lsn), + errdetail_plural("%u transaction needs to finish.", + "%u transactions need to finish.", + builder->running.xcnt, + (uint32) builder->running.xcnt))); + } + else + { + /* + * Because of the race condition in LogStandbySnapshot() the + * transactions recorded in xl_running_xacts as running might have + * already committed by the time the xl_running_xacts was written + * to WAL. Use the information about decoded transactions that we + * gathered so far to update our idea about what's still running. + * + * We can use SnapBuildEndTxn directly as it only does the + * transaction running check and handling without any additional + * side effects. + */ + for (off = 0; off < builder->committed.xcnt; off++) + SnapBuildEndTxn(builder, lsn, builder->committed.xip[off]); + if (builder->state == SNAPBUILD_CONSISTENT) + return false; + + ereport(LOG, + (errmsg("logical decoding moved initial starting point to %X/%X", + (uint32) (lsn >> 32), (uint32) lsn), + errdetail_plural("%u transaction needs to finish.", + "%u transactions need to finish.", + builder->running.xcnt, + (uint32) builder->running.xcnt))); + } + /* nothing could have built up so far, so don't perform cleanup */ return false; } diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index cd14667..4ea81f8 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2055,12 +2055,13 @@ GetRunningTransactionData(void) CurrentRunningXacts->oldestRunningXid = oldestRunningXid; CurrentRunningXacts->latestCompletedXid = latestCompletedXid; + /* We don't release XidGenLock here, the caller is responsible for that */ + LWLockRelease(ProcArrayLock); + Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid)); Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid)); Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid)); - /* We don't release the locks here, the caller is responsible for that */ - return CurrentRunningXacts; } diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 6259070..f461f21 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -933,27 +933,8 @@ LogStandbySnapshot(void) */ running = GetRunningTransactionData(); - /* - * GetRunningTransactionData() acquired ProcArrayLock, we must release it. - * For Hot Standby this can be done before inserting the WAL record - * because ProcArrayApplyRecoveryInfo() rechecks the commit status using - * the clog. For logical decoding, though, the lock can't be released - * early because the clog might be "in the future" from the POV of the - * historic snapshot. This would allow for situations where we're waiting - * for the end of a transaction listed in the xl_running_xacts record - * which, according to the WAL, has committed before the xl_running_xacts - * record. Fortunately this routine isn't executed frequently, and it's - * only a shared lock. - */ - if (wal_level < WAL_LEVEL_LOGICAL) - LWLockRelease(ProcArrayLock); - recptr = LogCurrentRunningXacts(running); - /* Release lock if we kept it longer ... */ - if (wal_level >= WAL_LEVEL_LOGICAL) - LWLockRelease(ProcArrayLock); - /* GetRunningTransactionData() acquired XidGenLock, we must release it */ LWLockRelease(XidGenLock); -- 2.7.4