From 70bfc94323ab6252f9046c3faccbcd0de5faa360 Mon Sep 17 00:00:00 2001 From: Nikhil Sontakke Date: Thu, 5 Apr 2018 19:40:20 +0530 Subject: [PATCH 2/5] Introduce LogicalLockTransaction/LogicalUnlockTransaction APIs When a transaction aborts, it's changes are considered unnecessary for other transactions. That means the changes may be either cleaned up by vacuum or removed from HOT chains (thus made inaccessible through indexes), and there may be other such consequences. When decoding committed transactions this is not an issue, and we never decode transactions that abort before the decoding starts (where decoding means passing it to ReorderBufferCommit). But for in-progress transactions - for example when decoding prepared transactions on PREPARE (and not COMMIT PREPARED as before), this may cause failures when the output plugin consults catalogs (both system and user-defined). To prevent aborts concurrent with plugins accessing catalogs, we introduce an API the output plugins are required to use (when decoding in-progress transactions only). Before accessing any catalogs, output plugins are required to call LogicalLockTransaction and then release it using LogicalUnlockTransaction. Implementation is via adding support for decoding groups. Use LockHashPartitionLockByProc on the group leader to get the LWLock protecting these fields. For prepared and uncommitted transactions, decoding backends working on the same XID will link themselves up to the corresponding PGPROC entry (decodeGroupLeader). They will remove themselves when they are done decoding. If the prepared or uncommitted transaction decides to abort, then the decodeGroupLeader will set the decodeAbortPending flag allowing the decodeGroupMembers to abort their decoding appropriately. If any of the decode group members errors out then also we remove that proc from the membership appropriately. --- src/backend/replication/logical/logical.c | 242 ++++++++++++++++ src/backend/storage/ipc/procarray.c | 39 +++ src/backend/storage/lmgr/README | 46 ++++ src/backend/storage/lmgr/proc.c | 442 +++++++++++++++++++++++++++++- src/include/replication/logical.h | 2 + src/include/replication/reorderbuffer.h | 15 + src/include/storage/proc.h | 26 ++ src/include/storage/procarray.h | 1 + 8 files changed, 804 insertions(+), 9 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3d8ad7ddf8..9bb382bb97 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1017,3 +1017,245 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); } } + +/* + * LogicalLockTransaction + * Make sure the transaction is not aborted during decoding. + * + * The logical decoding plugins may need to access catalogs (both system + * and user-defined), e.g. to get metadata about tuples, do custom + * filtering etc. While decoding committed transactions that is not an + * issue, but in-progress transactions may abort while being decoded, in + * which case the catalog access may fail in various ways (rows from + * aborted transactions are eligible for more aggressive cleanup, may + * not be accessible through indexes due to breaking HOT chains etc.). + * + * To prevent these issues, we need to prevent abort of the transaction + * while accessing any catalogs. To enforce that, each decoding backend + * has to call LogicalLockTransaction prior to any catalog access, and + * then LogicalUnlockTransaction immediately after it. The lock function + * adds the decoding backend into a "decoding group" for the transaction + * on the first call. Subsequent calls update a flag indicating whether + * the decoding backend may be accessing any catalogs. + * + * While aborting an in-progress transaction, the backend is made to wait + * for all current members of the decoding group that may be currently + * accessing catalogs (see LogicalDecodeRemoveTransaction). Once the + * transaction completes (applies to both abort and commit), the group + * is destroyed and is not needed anymore (we can check transaction + * status directly, instead). + * + * The function returns true when it's safe to access catalogs, and + * false when the transaction aborted (or is being aborted), in which + * case the plugin should stop decoding it. + * + * The decoding backend joins the decoding group only when actually + * needed. For example when the transaction did no catalog changes, + * or when it's known to already have committed (or aborted), we can + * bail out without joining the group. + */ +bool +LogicalLockTransaction(ReorderBufferTXN *txn) +{ + bool ok = false; + LWLock *leader_lwlock; + volatile PGPROC *leader = NULL; + volatile PGXACT *pgxact = NULL; + + /* + * Transactions that have not modified catalogs do not need to + * join the decoding group. + */ + if (!rbtxn_has_catalog_changes(txn)) + return true; + + /* + * Check commit status. If a transaction already committed, there + * is no danger when accessing catalogs. If it aborted, we can + * stop decoding it right away. + */ + if (rbtxn_commit(txn)) + return true; + + if (rbtxn_rollback(txn)) + return false; + + /* + * Currently, only 2PC transactions can be decoded before commit + * (at prepare). So regular transactions are automatically safe. + */ + if (!rbtxn_prepared(txn)) + return true; + + /* + * Find the PROC handling this XID and join the decoding group. + * + * If this is the first call for this XID, we don't know which + * PROC is executing the transaction (and acting as a leader). + * In that case we need to lookup and possibly also assign + * the leader. + */ + if (MyProc->decodeGroupLeader == NULL) + { + leader = AssignDecodeGroupLeader(txn->xid); + + /* + * We have checked if the transaction committed/aborted, but it + * is possible the PROC went away since then, in which case we + * get leader as NULL above. We recheck transaction status, + * expecting it to be either committed or aborted. + * + * If the PROC is available, add ourself as a member of its + * decoding group. Note that we're not holding any locks on PGPROC, + * so it's possible the leader disappears, or starts executing + * another transaction. In that case we're done. + */ + if (leader == NULL || + !BecomeDecodeGroupMember((PGPROC *)leader, txn->xid)) + goto lock_cleanup; + } + + /* + * We know the leader was executing this XID a while ago, and we + * might have become a member of the decode group as well. + * But we have not been holding any locks on PGPROC so it might + * have committed/aborted, removed us from the decoding group and + * started executing something else since then. So we need to + * recheck that it is indeed still running the right XID. + */ + leader = BackendXidGetProc(txn->xid); + if (!leader) + goto lock_cleanup; + + leader_lwlock = LockHashPartitionLockByProc(leader); + LWLockAcquire(leader_lwlock, LW_EXCLUSIVE); + + pgxact = &ProcGlobal->allPgXact[leader->pgprocno]; + if(pgxact->xid != txn->xid) + { + LWLockRelease(leader_lwlock); + goto lock_cleanup; + } + + /* ok, we are part of the decode group still */ + Assert(MyProc->decodeGroupLeader && + MyProc->decodeGroupLeader == leader); + + /* + * Re-check if we were told to abort by the leader after taking + * the above lock. + */ + if (leader->decodeAbortPending) + { + /* + * Remove ourself from the decodeGroupMembership and return + * false so that the decoding plugin also initiates abort + * processing + */ + RemoveDecodeGroupMemberLocked(MyProc->decodeGroupLeader); + MyProc->decodeLocked = false; + txn->txn_flags |= RBTXN_ROLLBACK; + ok = false; + } + else + { + /* ok to logically lock this backend */ + MyProc->decodeLocked = true; + ok = true; + } + LWLockRelease(leader_lwlock); + + return ok; + + /* + * if we reach lock_cleanup label, then lock was not granted. + * Check XID status and update txn flags appropriately before + * returning + */ +lock_cleanup: + Assert(!TransactionIdIsInProgress(txn->xid)); + if (TransactionIdDidCommit(txn->xid)) + { + txn->txn_flags |= RBTXN_COMMIT; + return true; + } + else + { + txn->txn_flags |= RBTXN_ROLLBACK; + return false; + } +} + +/* + * LogicalUnlockTransaction + * Indicate that the logical decoding plugin is done accessing + * catalog information. + * + * + * To prevent issues while decoding of in-progress transactions, we + * need to prevent abort of the transaction while accessing any catalogs. + * To enforce that, each decoding backend has to call + * LogicalLockTransaction prior to any catalog access, and then + * LogicalUnlockTransaction immediately after it. This unlock function + * removes the decoding backend from a "decoding group" for a given + * transaction. + */ +void +LogicalUnlockTransaction(ReorderBufferTXN *txn) +{ + LWLock *leader_lwlock; + PGPROC *leader = NULL; + + /* + * If the transaction is known to have aborted, we should have never got + * here (the plugin should have interrupted the decoding). + */ + Assert(!rbtxn_rollback(txn)); + + /* If it's not locked, we're done. */ + if (!MyProc->decodeLocked) + return; + + /* + * Transactions that have not modified catalogs do not need to + * join the decoding group. + */ + if (!rbtxn_has_catalog_changes(txn)) + return; + + /* + * Currently, only 2PC transactions can be decoded before commit + * (at prepare). So regular transactions are automatically safe. + */ + if (!rbtxn_prepared(txn)) + return; + + /* + * Check commit status. If a transaction already committed, there + * is no danger when accessing catalogs. + */ + if (rbtxn_commit(txn)) + return; + + /* + * We're guaranteed to still have a leader here, because we are + * in locked mode, so the leader can't just disappear. + */ + leader = MyProc->decodeGroupLeader; + Assert(leader && MyProc->decodeLocked); + + leader_lwlock = LockHashPartitionLockByProc(leader); + LWLockAcquire(leader_lwlock, LW_EXCLUSIVE); + if (leader->decodeAbortPending) + { + /* + * Remove ourself from the decodeGroupMembership + */ + RemoveDecodeGroupMemberLocked(leader); + + txn->txn_flags |= RBTXN_ROLLBACK; + } + MyProc->decodeLocked = false; + LWLockRelease(leader_lwlock); + return; +} diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index afe1c03aa3..2be2910207 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2432,6 +2432,45 @@ BackendXidGetPid(TransactionId xid) return result; } +/* + * BackendXidGetProc -- get a backend's PGPROC given its XID + * + * Note that it is up to the caller to be sure that the question + * remains meaningful for long enough for the answer to be used ... + * + * Only main transaction Ids are considered. + * + */ +PGPROC * +BackendXidGetProc(TransactionId xid) +{ + PGPROC *result = NULL; + ProcArrayStruct *arrayP = procArray; + int index; + + if (xid == InvalidTransactionId) /* never match invalid xid */ + return 0; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + + for (index = 0; index < arrayP->numProcs; index++) + { + int pgprocno = arrayP->pgprocnos[index]; + PGPROC *proc = &allProcs[pgprocno]; + volatile PGXACT *pgxact = &allPgXact[pgprocno]; + + if (pgxact->xid == xid) + { + result = proc; + break; + } + } + + LWLockRelease(ProcArrayLock); + + return result; +} + /* * IsBackendPid -- is a given pid a running backend * diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README index 56b0a12a3e..4b4b9c5958 100644 --- a/src/backend/storage/lmgr/README +++ b/src/backend/storage/lmgr/README @@ -679,6 +679,52 @@ worker, and the worker fails to join the lock group unless the given PGPROC still has the same PID and is still a lock group leader. We assume that PIDs are not recycled quickly enough for this interlock to fail. +Decode Group Locking +-------------------- + +When decoding in-progress transactions, we need to prevent aborts while +the decoding processes are accessing catalogs, which might lead to issues +if the transaction modified some of the catalogs. Currently this applies +only to two-phase transactions, that may be decoded at PREPARE time, but +in the future this may be extended to regular transactions too. + +To prevent that, the backend executing the abort is made to wait for all +the decoding backends. We use an infrastructure which is very similar +to the above group locking to form groups of backends performing logical +decoding of the same in-progress transaction. + +Decode Group locking adds five new members to each PGPROC: +decodeGroupLeader, decodeGroupMembers, decodeGroupLink, decodeLocked and +decodeAbortPending. A PGPROC's decodeGroupLeader is NULL for processes +not involved in logical decoding. When a process wants to decode an +in-progress transaction then it finds out the PGPROC structure which is +associated with that transaction ID and makes that PGPROC structure as +its decodeGroupLeader. The decodeGroupMembers field is only used in the +leader; it is a list of the member PGPROCs of the decode group (the +leader and all backends decoding this transaction ID). +The decodeGroupLink field is the list link for this list. The decoding +backend marks itself as decodeLocked while it is accessing catalog +metadata for its decoding requirements via the LogicalLockTransaction +API. It resets the same via the LogicalUnlockTransaction API. + +Meanwhile, if the transaction ID of this in-progress transaction decides +to abort, then the PGPROC corresponding to it sets decodeAbortPending +on itself and also on all the decodeGroupMembers entries. + +The decodeGroupMembers entries stop decoding this transaction and exit. +When all the decoding backends have exited the abort can proceed. + +All five of these fields are considered to be protected by a lock manager +partition lock. The partition lock that protects these fields within a given +lock group is chosen by taking the leader's pgprocno modulo the number of lock +manager partitions. Holding this single lock allows safe manipulation of the +decodeGroupMembers list for the lock group. + +The decodeGroupLeader's PGPROC and also its PID is accessible to each +decoding backend. And the decoding backend fails to join the decode +lock group unless the given PGPROC still has the same PID and is still +a decode group leader. We assume that PIDs are not recycled quickly +enough for this interlock to fail. User Locks (Advisory Locks) --------------------------- diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 6f30e082b2..82a2450319 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -267,6 +267,11 @@ InitProcGlobal(void) /* Initialize lockGroupMembers list. */ dlist_init(&procs[i].lockGroupMembers); + + /* Initialize decodeGroupMembers list. */ + dlist_init(&procs[i].decodeGroupMembers); + procs[i].decodeAbortPending = false; + procs[i].decodeLocked = false; } /* @@ -406,6 +411,12 @@ InitProcess(void) Assert(MyProc->lockGroupLeader == NULL); Assert(dlist_is_empty(&MyProc->lockGroupMembers)); + /* Check that group decode fields are in a proper initial state. */ + Assert(MyProc->decodeGroupLeader == NULL); + Assert(dlist_is_empty(&MyProc->decodeGroupMembers)); + MyProc->decodeAbortPending = false; + MyProc->decodeLocked = false; + /* Initialize wait event information. */ MyProc->wait_event_info = 0; @@ -581,6 +592,12 @@ InitAuxiliaryProcess(void) Assert(MyProc->lockGroupLeader == NULL); Assert(dlist_is_empty(&MyProc->lockGroupMembers)); + /* Check that group decode fields are in a proper initial state. */ + Assert(MyProc->decodeGroupLeader == NULL); + Assert(dlist_is_empty(&MyProc->decodeGroupMembers)); + MyProc->decodeAbortPending = false; + MyProc->decodeLocked = false; + /* * We might be reusing a semaphore that belonged to a failed process. So * be careful and reinitialize its value here. (This is not strictly @@ -826,9 +843,14 @@ ProcKill(int code, Datum arg) /* * Detach from any lock group of which we are a member. If the leader - * exist before all other group members, it's PGPROC will remain allocated + * exits before all other group members, its PGPROC will remain allocated * until the last group process exits; that process must return the * leader's PGPROC to the appropriate list. + * + * The below code needs to be mindful of the presence of decode group + * entries in case of logical decoding. However, lock groups are for + * parallel workers so we typically won't be finding both present + * together in the same proc. */ if (MyProc->lockGroupLeader != NULL) { @@ -845,11 +867,19 @@ ProcKill(int code, Datum arg) { procgloballist = leader->procgloballist; - /* Leader exited first; return its PGPROC. */ - SpinLockAcquire(ProcStructLock); - leader->links.next = (SHM_QUEUE *) *procgloballist; - *procgloballist = leader; - SpinLockRelease(ProcStructLock); + /* + * Leader exited first; return its PGPROC. + * Only do this if it does not have any decode + * group members though. Otherwise that will + * release it later + */ + if (leader->decodeGroupLeader == NULL) + { + SpinLockAcquire(ProcStructLock); + leader->links.next = (SHM_QUEUE *) *procgloballist; + *procgloballist = leader; + SpinLockRelease(ProcStructLock); + } } } else if (leader != MyProc) @@ -857,6 +887,54 @@ ProcKill(int code, Datum arg) LWLockRelease(leader_lwlock); } + /* + * Detach from any decode group of which we are a member. If the leader + * exits before all other group members, its PGPROC will remain allocated + * until the last group process exits; that process must return the + * leader's PGPROC to the appropriate list. + */ + if (MyProc->decodeGroupLeader != NULL) + { + PGPROC *leader = MyProc->decodeGroupLeader; + LWLock *leader_lwlock = LockHashPartitionLockByProc(leader); + + LWLockAcquire(leader_lwlock, LW_EXCLUSIVE); + Assert(!dlist_is_empty(&leader->decodeGroupMembers)); + dlist_delete(&MyProc->decodeGroupLink); + if (dlist_is_empty(&leader->decodeGroupMembers)) + { + leader->decodeGroupLeader = NULL; + if (leader != MyProc) + { + procgloballist = leader->procgloballist; + + /* + * Leader exited first; return its PGPROC. + * But check if it was already done above + * by the lockGroup code + */ + if (leader != *procgloballist) + { + SpinLockAcquire(ProcStructLock); + leader->links.next = (SHM_QUEUE *) *procgloballist; + *procgloballist = leader; + SpinLockRelease(ProcStructLock); + } + } + /* clear leader flags */ + leader->decodeAbortPending = false; + leader->decodeLocked = false; + } + else if (leader != MyProc) + { + MyProc->decodeGroupLeader = NULL; + /* clear proc flags */ + MyProc->decodeLocked = false; + MyProc->decodeAbortPending = false; + } + LWLockRelease(leader_lwlock); + } + /* * Reset MyLatch to the process local one. This is so that signal * handlers et al can continue using the latch after the shared latch @@ -881,9 +959,36 @@ ProcKill(int code, Datum arg) /* Since lockGroupLeader is NULL, lockGroupMembers should be empty. */ Assert(dlist_is_empty(&proc->lockGroupMembers)); - /* Return PGPROC structure (and semaphore) to appropriate freelist */ - proc->links.next = (SHM_QUEUE *) *procgloballist; - *procgloballist = proc; + /* + * Return PGPROC structure (and semaphore) to appropriate freelist. + * Again check if decode group stuff will handle it later. + */ + if (proc->decodeGroupLeader == NULL) + { + proc->links.next = (SHM_QUEUE *) *procgloballist; + *procgloballist = proc; + } + } + + /* + * If we're still a member of a decode group, that means we're a leader + * which has somehow exited before its children. The last remaining child + * will release our PGPROC. Otherwise, release it now. + */ + if (proc->decodeGroupLeader == NULL) + { + /* Since decodeGroupLeader is NULL, decodeGroupMembers should be empty. */ + Assert(dlist_is_empty(&proc->decodeGroupMembers)); + + /* + * Return PGPROC structure (and semaphore) to appropriate freelist + * But check if it was already done above by the lockGroup code + */ + if (proc != *procgloballist) + { + proc->links.next = (SHM_QUEUE *) *procgloballist; + *procgloballist = proc; + } } /* Update shared estimate of spins_per_delay */ @@ -1887,3 +1992,322 @@ BecomeLockGroupMember(PGPROC *leader, int pid) return ok; } + +/* + * AssignDecodeGroupLeader + * Lookup process using xid and designate as decode group leader. + * + * Once this function has returned, other processes can join the decode + * group by calling BecomeDecodeGroupMember. + */ +PGPROC * +AssignDecodeGroupLeader(TransactionId xid) +{ + PGPROC *proc = NULL; + LWLock *leader_lwlock; + + Assert(xid != InvalidTransactionId); + + /* + * Lookup the backend executing this transaction. + * + * If the transaction already completed, we can bail out. + */ + proc = BackendXidGetProc(xid); + if (!proc) + return NULL; + + /* + * Process running a XID can't have a leader, it can only be + * a leader (in which case it points to itself). + */ + Assert(!proc->decodeGroupLeader || + (proc->decodeGroupLeader == proc)); + + /* + * This proc will become decodeGroupLeader if it's not already. + */ + if (proc->decodeGroupLeader == NULL) + { + volatile PGXACT *pgxact; + volatile PGPROC *leader; + + /* Create single-member group, containing this proc. */ + leader_lwlock = LockHashPartitionLockByProc(proc); + LWLockAcquire(leader_lwlock, LW_EXCLUSIVE); + + /* recheck we are still the same */ + leader = BackendXidGetProc(xid); + if (!leader || leader != proc) + { + LWLockRelease(leader_lwlock); + return NULL; + } + + pgxact = &ProcGlobal->allPgXact[leader->pgprocno]; + + /* + * We know the process was executing the XID a while ago, but we + * have not been holding any locks on PGPROC so it might have + * started executing something else since then. So we need to + * recheck that it is indeed still running the right XID. + * + * If it's not, the transaction must have already completed, so + * we don't need to create any decoding group. + */ + if (pgxact->xid == xid) + { + /* + * Some other decoding backend might have marked the process + * as a leader before we acquired the lock. But it must not + * be a follower of some other leader. + */ + Assert(!proc->decodeGroupLeader || + (proc->decodeGroupLeader == proc)); + + /* recheck if someone else did not already assign us */ + if (proc->decodeGroupLeader == NULL) + { + /* + * The leader is also a part of the decoding group, + * so we add it to the members list as well. + */ + proc->decodeGroupLeader = proc; + dlist_push_head(&proc->decodeGroupMembers, + &proc->decodeGroupLink); + } + } + else + { + /* proc entry is gone */ + proc = NULL; + } + LWLockRelease(leader_lwlock); + } + + if (proc) + elog(DEBUG1, "became group leader (%p)", proc); + return proc; +} + +/* + * BecomeDecodeGroupMember - designate process as decode group member + * + * This is pretty straightforward except for the possibility that the leader + * whose group we're trying to join might exit before we manage to do so; + * and the PGPROC might get recycled for an unrelated process. To avoid + * that, we require the caller to pass the PID of the intended PGPROC as + * an interlock. Returns true if we successfully join the intended lock + * group, and false if not. + */ +bool +BecomeDecodeGroupMember(PGPROC *leader, TransactionId xid) +{ + LWLock *leader_lwlock; + bool ok = false; + volatile PGXACT *pgxact; + volatile PGPROC *proc = NULL; + + /* Group leader can't become member of group */ + Assert(MyProc != leader); + + /* Can't already be a member of a group */ + Assert(MyProc->decodeGroupLeader == NULL); + + /* XID must be valid */ + Assert(TransactionIdIsValid(xid)); + + /* + * Get lock protecting the group fields. Note LockHashPartitionLockByProc + * accesses leader->pgprocno in a PGPROC that might be free. This is safe + * because all PGPROCs' pgprocno fields are set during shared memory + * initialization and never change thereafter; so we will acquire the + * correct lock even if the leader PGPROC is in process of being recycled. + */ + leader_lwlock = LockHashPartitionLockByProc(leader); + LWLockAcquire(leader_lwlock, LW_EXCLUSIVE); + + /* Is this the leader we're looking for? */ + proc = BackendXidGetProc(xid); + if (!proc || leader != proc) + { + LWLockRelease(leader_lwlock); + return NULL; + } + pgxact = &ProcGlobal->allPgXact[leader->pgprocno]; + if (pgxact->xid == xid) + { + /* is the leader going away? */ + if (leader->decodeAbortPending) + ok = false; + else + { + /* OK, join the group */ + ok = true; + MyProc->decodeGroupLeader = leader; + dlist_push_tail(&leader->decodeGroupMembers, &MyProc->decodeGroupLink); + } + } + LWLockRelease(leader_lwlock); + + if (ok) + elog(DEBUG1, "became group member (%p) to (%p)", MyProc, leader); + return ok; +} + +/* + * RemoveDecodeGroupMember + * Remove a member from the decoding group of a leader. + */ +void +RemoveDecodeGroupMember(PGPROC *leader) +{ + LWLock *leader_lwlock; + + leader_lwlock = LockHashPartitionLockByProc(leader); + LWLockAcquire(leader_lwlock, LW_EXCLUSIVE); + RemoveDecodeGroupMemberLocked(leader); + LWLockRelease(leader_lwlock); + + return; +} + +/* + * RemoveDecodeGroupMemberLocked + * Remove a member from a decoding group of a leader. + * + * Assumes that the caller is holding appropriate lock on PGPROC. + */ +void +RemoveDecodeGroupMemberLocked(PGPROC *leader) +{ + Assert(!dlist_is_empty(&leader->decodeGroupMembers)); + dlist_delete(&MyProc->decodeGroupLink); + /* leader links to itself, so never empty */ + Assert(!dlist_is_empty(&leader->decodeGroupMembers)); + MyProc->decodeGroupLeader = NULL; + elog(DEBUG1, "removed group member (%p) from (%p)", MyProc, leader); + + return; +} + +/* + * LogicalDecodeRemoveTransaction + * Notify all decoding members that this transaction is going away. + * + * Wait for all decodeGroupMembers to ack back before returning from + * here but only in case of aborts. + * + * This function should be called *after* the proc has been removed + * from the procArray. + * + * If the transaction is committing, it's ok for the decoding backends + * to continue merrily - there is no danger in accessing catalogs. When + * it tries to join the decoding group, it won't find the proc anymore, + * forcing it to re-check transaction status and cache the commit + * status for future calls (see LogicalLockTransaction). + * + * In case a backend which is part of the decode group dies/crashes, + * then that would effectively cause the database to restart cleaning + * up the shared memory state + */ +void +LogicalDecodeRemoveTransaction(PGPROC *leader, bool isCommit) +{ + LWLock *leader_lwlock; + dlist_mutable_iter change_i; + dlist_iter iter; + PGPROC *proc; + bool do_wait; + + leader_lwlock = LockHashPartitionLockByProc(leader); + LWLockAcquire(leader_lwlock, LW_EXCLUSIVE); + + /* + * If the proc has not been initialized as a group leader, there are + * no group members to wait for and we can terminate right away. + */ + if (leader->decodeGroupLeader == NULL) + { + Assert(dlist_is_empty(&leader->decodeGroupMembers)); + LWLockRelease(leader_lwlock); + return; + } + + /* mark the transaction as aborting */ + leader->decodeAbortPending = (!isCommit); + +recheck: + do_wait = false; + Assert(leader->decodeGroupLeader == leader); + Assert(!dlist_is_empty(&leader->decodeGroupMembers)); + if (!isCommit) + { + /* + * We need to walk the list of group members, and decide if we + * need to wait for some of them. In other words, we need to + * check if there are any processes besides the leader. + */ + dlist_foreach(iter, &leader->decodeGroupMembers) + { + proc = dlist_container(PGPROC, decodeGroupLink, iter.cur); + + /* Ignore the leader (i.e. ourselves). */ + if (proc == leader) + continue; + + /* if the proc is currently locked, wait */ + if (proc->decodeLocked) + do_wait = true; + } + + if (do_wait) + { + int rc; + LWLockRelease(leader_lwlock); + + elog(LOG, "Waiting for backends to abort decoding"); + /* + * Wait on our latch to allow decodeGroupMembers to + * go away soon + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 100L, + WAIT_EVENT_PG_SLEEP); + ResetLatch(MyLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + CHECK_FOR_INTERRUPTS(); + + /* Recheck decodeGroupMembers */ + LWLockAcquire(leader_lwlock, LW_EXCLUSIVE); + goto recheck; + } + } + + /* + * All backends exited cleanly in case of aborts above, + * remove decodeGroupMembers now for both commit/abort cases + */ + Assert(leader->decodeGroupLeader == leader); + Assert(!dlist_is_empty(&leader->decodeGroupMembers)); + dlist_foreach_modify(change_i, &leader->decodeGroupMembers) + { + proc = dlist_container(PGPROC, decodeGroupLink, change_i.cur); + Assert(!proc->decodeLocked); + dlist_delete(&proc->decodeGroupLink); + elog(DEBUG1, "deleting group member (%p) from (%p)", + proc, leader); + proc->decodeGroupLeader = NULL; + } + Assert(dlist_is_empty(&leader->decodeGroupMembers)); + leader->decodeGroupLeader = NULL; + leader->decodeAbortPending = false; + LWLockRelease(leader_lwlock); + + return; +} diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 619c5f4d73..63b14367f0 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -117,6 +117,8 @@ extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin); extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn); extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); +extern bool LogicalLockTransaction(ReorderBufferTXN *txn); +extern void LogicalUnlockTransaction(ReorderBufferTXN *txn); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 177ef98e43..385bb486bb 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -141,6 +141,11 @@ typedef struct ReorderBufferChange #define RBTXN_HAS_CATALOG_CHANGES 0x0001 #define RBTXN_IS_SUBXACT 0x0002 #define RBTXN_IS_SERIALIZED 0x0004 +#define RBTXN_PREPARE 0x0008 +#define RBTXN_COMMIT_PREPARED 0x0010 +#define RBTXN_ROLLBACK_PREPARED 0x0020 +#define RBTXN_COMMIT 0x0040 +#define RBTXN_ROLLBACK 0x0080 /* does the txn have catalog changes */ #define rbtxn_has_catalog_changes(txn) (txn->txn_flags & RBTXN_HAS_CATALOG_CHANGES) @@ -154,6 +159,16 @@ typedef struct ReorderBufferChange * nentries_mem == nentries. */ #define rbtxn_is_serialized(txn) (txn->txn_flags & RBTXN_IS_SERIALIZED) +/* is this txn prepared? */ +#define rbtxn_prepared(txn) (txn->txn_flags & RBTXN_PREPARE) +/* was this prepared txn committed in the meanwhile? */ +#define rbtxn_commit_prepared(txn) (txn->txn_flags & RBTXN_COMMIT_PREPARED) +/* was this prepared txn aborted in the meanwhile? */ +#define rbtxn_rollback_prepared(txn) (txn->txn_flags & RBTXN_ROLLBACK_PREPARED) +/* was this txn committed in the meanwhile? */ +#define rbtxn_commit(txn) (txn->txn_flags & RBTXN_COMMIT) +/* was this prepared txn aborted in the meanwhile? */ +#define rbtxn_rollback(txn) (txn->txn_flags & RBTXN_ROLLBACK) typedef struct ReorderBufferTXN { diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 5c19a61dcf..ae842b64d0 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -200,6 +200,26 @@ struct PGPROC PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */ dlist_head lockGroupMembers; /* list of members, if I'm a leader */ dlist_node lockGroupLink; /* my member link, if I'm a member */ + + /* + * Support for decoding groups. Use LockHashPartitionLockByProc on the group + * leader to get the LWLock protecting these fields. + * + * For prepared and uncommitted transactions, decoding backends working on + * the same XID will link themselves up to the corresponding PGPROC + * entry (decodeGroupLeader). + * + * They will remove themselves when they are done decoding. + * + * If the prepared or uncommitted transaction decides to abort, then + * the decodeGroupLeader will set the decodeAbortPending flag allowing + * the decodeGroupMembers to abort their decoding appropriately + */ + PGPROC *decodeGroupLeader; /* decode group leader, if I'm a member */ + dlist_head decodeGroupMembers; /* list of members, if I'm a leader */ + dlist_node decodeGroupLink; /* my member link, if I'm a member */ + bool decodeLocked; /* is it currently locked by this proc? */ + bool decodeAbortPending; /* is the decode group leader aborting? */ }; /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ @@ -327,4 +347,10 @@ extern PGPROC *AuxiliaryPidGetProc(int pid); extern void BecomeLockGroupLeader(void); extern bool BecomeLockGroupMember(PGPROC *leader, int pid); +extern PGPROC *AssignDecodeGroupLeader(TransactionId xid); +extern bool BecomeDecodeGroupMember(PGPROC *leader, TransactionId pid); +extern void RemoveDecodeGroupMember(PGPROC *leader); +extern void RemoveDecodeGroupMemberLocked(PGPROC *leader); +extern void LogicalDecodeRemoveTransaction(PGPROC *leader, bool isCommit); + #endif /* PROC_H */ diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 75bab2985f..776de2470e 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -97,6 +97,7 @@ extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids extern PGPROC *BackendPidGetProc(int pid); extern PGPROC *BackendPidGetProcWithLock(int pid); +extern PGPROC *BackendXidGetProc(TransactionId xid); extern int BackendXidGetPid(TransactionId xid); extern bool IsBackendPid(int pid); -- 2.15.1 (Apple Git-101)