From 7640800a2a1342a4ddeed4ffa049bf80aa99d4e1 Mon Sep 17 00:00:00 2001 From: Nikhil Sontakke Date: Thu, 26 Jul 2018 18:45:26 +0530 Subject: [PATCH 3/4] Gracefully handle concurrent aborts of uncommitted transactions that are being decoded alongside. When a transaction aborts, it's changes are considered unnecessary for other transactions. That means the changes may be either cleaned up by vacuum or removed from HOT chains (thus made inaccessible through indexes), and there may be other such consequences. When decoding committed transactions this is not an issue, and we never decode transactions that abort before the decoding starts. But for in-progress transactions - for example when decoding prepared transactions on PREPARE (and not COMMIT PREPARED as before), this may cause failures when the output plugin consults catalogs (both system and user-defined). We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK sqlerrcode from system table scan APIs to the backend decoding a specific uncommitted transaction. The decoding logic on the receipt of such an sqlerrcode aborts the ongoing decoding and returns gracefully. --- doc/src/sgml/logicaldecoding.sgml | 5 ++- src/backend/access/heap/heapam.c | 51 +++++++++++++++++++++++++ src/backend/access/index/genam.c | 35 +++++++++++++++++ src/backend/replication/logical/reorderbuffer.c | 32 +++++++++++++--- src/backend/utils/time/snapmgr.c | 25 +++++++++++- src/include/utils/snapmgr.h | 4 +- 6 files changed, 143 insertions(+), 9 deletions(-) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index a89e4d5184..d76afbda05 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -421,7 +421,10 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); - Any actions leading to transaction ID assignment are prohibited. That, among others, + Note that access to user catalog tables or regular system catalog tables + in the output plugins has to be done via the systable_* scan APIs only. + Access via the heap_* scan APIs will error out. + Additionally, any actions leading to transaction ID assignment are prohibited. That, among others, includes writing to tables, performing DDL changes, and calling txid_current(). diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 72395a50b8..ae9d24c164 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1834,6 +1834,17 @@ heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot) HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction) { + /* + * We don't expect direct calls to heap_getnext with valid + * CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(scan->rs_rd) || + RelationIsUsedAsCatalogTable(scan->rs_rd)))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("improper heap_getnext call"))); + /* Note: no locking manipulations needed */ HEAPDEBUG_1; /* heap_getnext( info ) */ @@ -1914,6 +1925,16 @@ heap_fetch(Relation relation, OffsetNumber offnum; bool valid; + /* + * We don't expect direct calls to heap_fetch with valid + * CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("improper heap_fetch call"))); + /* * Fetch and pin the appropriate page of the relation. */ @@ -2046,6 +2067,16 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, bool valid; bool skip; + /* + * We don't expect direct calls to heap_hot_search_buffer with + * valid CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("improper heap_hot_search_buffer call"))); + /* If this is not the first call, previous call returned a (live!) tuple */ if (all_dead) *all_dead = first_call; @@ -2187,6 +2218,16 @@ heap_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot, Buffer buffer; HeapTupleData heapTuple; + /* + * We don't expect direct calls to heap_hot_search with + * valid CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("improper heap_hot_search call"))); + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); LockBuffer(buffer, BUFFER_LOCK_SHARE); result = heap_hot_search_buffer(tid, relation, buffer, snapshot, @@ -2216,6 +2257,16 @@ heap_get_latest_tid(Relation relation, ItemPointerData ctid; TransactionId priorXmax; + /* + * We don't expect direct calls to heap_get_latest_tid with valid + * CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("improper heap_get_latest_tid call"))); + /* this is to avoid Assert failures on bad input */ if (!ItemPointerIsValid(tid)) return; diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c index 9d08775687..9220dcce83 100644 --- a/src/backend/access/index/genam.c +++ b/src/backend/access/index/genam.c @@ -25,6 +25,7 @@ #include "lib/stringinfo.h" #include "miscadmin.h" #include "storage/bufmgr.h" +#include "storage/procarray.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -423,6 +424,17 @@ systable_getnext(SysScanDesc sysscan) else htup = heap_getnext(sysscan->scan, ForwardScanDirection); + /* + * If CheckXidAlive is valid, then we check if it aborted. If it did, we + * error out + */ + if (TransactionIdIsValid(CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction aborted during system catalog scan"))); + return htup; } @@ -476,6 +488,18 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup) result = HeapTupleSatisfiesVisibility(tup, freshsnap, scan->rs_cbuf); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); } + + /* + * If CheckXidAlive is valid, then we check if it aborted. If it did, we + * error out + */ + if (TransactionIdIsValid(CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction aborted during system catalog scan"))); + return result; } @@ -593,6 +617,17 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction) if (htup && sysscan->iscan->xs_recheck) elog(ERROR, "system catalog scans with lossy index conditions are not implemented"); + /* + * If CheckXidAlive is valid, then we check if it aborted. If it did, we + * error out + */ + if (TransactionIdIsValid(CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction aborted during system catalog scan"))); + return htup; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 2fffc90606..96d52d32c1 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -599,7 +599,7 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); /* setup snapshot to allow catalog access */ - SetupHistoricSnapshot(snapshot_now, NULL); + SetupHistoricSnapshot(snapshot_now, NULL, xid); PG_TRY(); { rb->message(rb, txn, lsn, false, prefix, message_size, message); @@ -1405,6 +1405,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn, volatile CommandId command_id = FirstCommandId; bool using_subtxn; ReorderBufferIterTXNState *volatile iterstate = NULL; + MemoryContext ccxt = CurrentMemoryContext; txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; @@ -1431,7 +1432,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn, ReorderBufferBuildTupleCidHash(rb, txn); /* setup the initial snapshot */ - SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid); /* * Decoding needs access to syscaches et al., which in turn use @@ -1672,7 +1673,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn, /* and continue with the new one */ - SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid); break; case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: @@ -1692,7 +1693,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn, snapshot_now->curcid = command_id; TeardownHistoricSnapshot(false); - SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid); /* * Every time the CommandId is incremented, we could @@ -1777,6 +1778,20 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn, PG_CATCH(); { /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */ + MemoryContext ecxt = MemoryContextSwitchTo(ccxt); + ErrorData *errdata = CopyErrorData(); + + /* + * if the catalog scan access returned an error of + * rollback, then abort on the other side as well + */ + if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK) + { + elog(LOG, "stopping decoding of %s (%u)", + txn->gid[0] != '\0'? txn->gid:"", txn->xid); + rb->abort(rb, txn, commit_lsn); + } + if (iterstate) ReorderBufferIterTXNFinish(rb, iterstate); @@ -1800,7 +1815,14 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn, /* remove potential on-disk data, and deallocate */ ReorderBufferCleanupTXN(rb, txn); - PG_RE_THROW(); + /* re-throw only if it's not an abort */ + if (errdata->sqlerrcode != ERRCODE_TRANSACTION_ROLLBACK) + { + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } + else + FlushErrorState(); } PG_END_TRY(); } diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index edf59efc29..0354fc9da9 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -151,6 +151,13 @@ static Snapshot SecondarySnapshot = NULL; static Snapshot CatalogSnapshot = NULL; static Snapshot HistoricSnapshot = NULL; +/* + * An xid value pointing to a possibly ongoing or a prepared transaction. + * Currently used in logical decoding. It's possible that such transactions + * can get aborted while the decoding is ongoing. + */ +TransactionId CheckXidAlive = InvalidTransactionId; + /* * These are updated by GetSnapshotData. We initialize them this way * for the convenience of TransactionIdIsInProgress: even in bootstrap @@ -1995,10 +2002,14 @@ MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin) * Setup a snapshot that replaces normal catalog snapshots that allows catalog * access to behave just like it did at a certain point in the past. * + * If a valid xid is passed in, we check if it is uncommitted and track it in + * CheckXidAlive. This is to re-check XID status while accessing catalog. + * * Needed for logical decoding. */ void -SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids) +SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids, + TransactionId snapshot_xid) { Assert(historic_snapshot != NULL); @@ -2007,8 +2018,17 @@ SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids) /* setup (cmin, cmax) lookup hash */ tuplecid_data = tuplecids; -} + /* + * setup CheckXidAlive if it's not committed yet. We don't check + * if the xid aborted. That will happen during catalog access. + */ + if (TransactionIdIsValid(snapshot_xid) && + !TransactionIdDidCommit(snapshot_xid)) + CheckXidAlive = snapshot_xid; + else + CheckXidAlive = InvalidTransactionId; +} /* * Make catalog snapshots behave normally again. @@ -2018,6 +2038,7 @@ TeardownHistoricSnapshot(bool is_error) { HistoricSnapshot = NULL; tuplecid_data = NULL; + CheckXidAlive = InvalidTransactionId; } bool diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index 83806f3040..bad2053477 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -100,8 +100,10 @@ extern char *ExportSnapshot(Snapshot snapshot); /* Support for catalog timetravel for logical decoding */ struct HTAB; +extern TransactionId CheckXidAlive; extern struct HTAB *HistoricSnapshotGetTupleCids(void); -extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids); +extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids, + TransactionId snapshot_xid); extern void TeardownHistoricSnapshot(bool is_error); extern bool HistoricSnapshotActive(void); -- 2.15.2 (Apple Git-101.1)