From d45c7c4f13d1ababfedd5cdc5417335eac8cc5b9 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Thu, 9 Apr 2020 10:55:19 +0530 Subject: [PATCH v21 04/12] Gracefully handle concurrent aborts of uncommitted transactions that are being decoded alongside. When a transaction aborts, it's changes are considered unnecessary for other transactions. That means the changes may be either cleaned up by vacuum or removed from HOT chains (thus made inaccessible through indexes), and there may be other such consequences. When decoding committed transactions this is not an issue, and we never decode transactions that abort before the decoding starts. But for in-progress transactions, this may cause failures when the output plugin consults catalogs (both system and user-defined). We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK sqlerrcode from system table scan APIs to the backend decoding a specific uncommitted transaction. The decoding logic on the receipt of such an sqlerrcode aborts the ongoing decoding and returns gracefully. --- doc/src/sgml/logicaldecoding.sgml | 9 +++-- src/backend/access/heap/heapam.c | 10 ++++++ src/backend/access/index/genam.c | 53 ++++++++++++++++++++++++++++ src/backend/access/table/tableam.c | 8 +++++ src/backend/utils/time/snapmgr.c | 13 +++++++ src/include/access/tableam.h | 55 ++++++++++++++++++++++++++++++ src/include/utils/snapmgr.h | 2 ++ 7 files changed, 147 insertions(+), 3 deletions(-) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 1b56daa4bb..5f7394f3c1 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -432,9 +432,12 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); - Any actions leading to transaction ID assignment are prohibited. That, among others, - includes writing to tables, performing DDL changes, and - calling pg_current_xact_id(). + Note that access to user catalog tables or regular system catalog tables + in the output plugins has to be done via the systable_* + scan APIs only. Access via the heap_* scan APIs will + error out. Additionally, any actions leading to transaction ID assignment + are prohibited. That, among others, includes writing to tables, performing + DDL changes, and calling pg_current_xact_id(). diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 94eb37d48d..2d77107c4f 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1288,6 +1288,16 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg_internal("only heap AM is supported"))); + /* + * We don't expect direct calls to heap_getnext with valid CheckXidAlive + * for catalog or regular tables. See detailed comments at snapmgr.c + * where these variables are declared. Normally we have such a check at + * tableam level API but this is called from many places so we need to + * ensure it here. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected heap_getnext call during logical decoding"); + /* Note: no locking manipulations needed */ if (scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE) diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c index dfba5ae39a..446b8cbc86 100644 --- a/src/backend/access/index/genam.c +++ b/src/backend/access/index/genam.c @@ -28,6 +28,7 @@ #include "lib/stringinfo.h" #include "miscadmin.h" #include "storage/bufmgr.h" +#include "storage/procarray.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -429,9 +430,36 @@ systable_beginscan(Relation heapRelation, sysscan->iscan = NULL; } + /* + * If CheckXidAlive is set then set a flag to indicate that system table + * scan is in-progress. See detailed comments at snapmgr.c where these + * variables are declared. + */ + if (TransactionIdIsValid(CheckXidAlive)) + bsysscan = true; + return sysscan; } +/* + * HandleConcurrentAbort - Handle concurrent abort of the CheckXidAlive. + * + * Error out, if CheckXidAlive is aborted. We can't directly use + * TransactionIdDidAbort as after crash such transaction might not have been + * marked as aborted. See detailed comments at snapmgr.c where the variable + * is declared. + */ +static inline void +HandleConcurrentAbort() +{ + if (TransactionIdIsValid(CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction aborted during system catalog scan"))); +} + /* * systable_getnext --- get next tuple in a heap-or-index scan * @@ -481,6 +509,12 @@ systable_getnext(SysScanDesc sysscan) } } + /* + * Handle the concurrent abort while fetching the catalog tuple during + * logical streaming of a transaction. + */ + HandleConcurrentAbort(); + return htup; } @@ -517,6 +551,12 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup) sysscan->slot, freshsnap); + /* + * Handle the concurrent abort while fetching the catalog tuple during + * logical streaming of a transaction. + */ + HandleConcurrentAbort(); + return result; } @@ -545,6 +585,13 @@ systable_endscan(SysScanDesc sysscan) if (sysscan->snapshot) UnregisterSnapshot(sysscan->snapshot); + /* + * Reset the sysbegin_called flag at the end of the systable scan. See + * detailed comments at snapmgr.c where these variables are declared. + */ + if (TransactionIdIsValid(CheckXidAlive)) + bsysscan = false; + pfree(sysscan); } @@ -643,6 +690,12 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction) if (htup && sysscan->iscan->xs_recheck) elog(ERROR, "system catalog scans with lossy index conditions are not implemented"); + /* + * Handle the concurrent abort while fetching the catalog tuple during + * logical streaming of a transaction. + */ + HandleConcurrentAbort(); + return htup; } diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index c814733b22..2f52b407c6 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -230,6 +230,14 @@ table_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid) Relation rel = scan->rs_rd; const TableAmRoutine *tableam = rel->rd_tableam; + /* + * We don't expect direct calls to table_tuple_get_latest_tid with valid + * CheckXidAlive for catalog or regular tables. See detailed comments at + * snapmgr.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_tuple_get_latest_tid call during logical decoding"); + /* * Since this can be called with user-supplied TID, don't trust the input * too much. diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 1c063c592c..9f1ecd123f 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -153,6 +153,19 @@ static Snapshot SecondarySnapshot = NULL; static Snapshot CatalogSnapshot = NULL; static Snapshot HistoricSnapshot = NULL; +/* + * CheckXidAlive is a xid value pointing to a possibly ongoing (sub) + * transaction.  Currently, it is used in logical decoding.  It's possible + * that such transactions can get aborted while the decoding is ongoing in + * which case we skip decoding that particular transaction. To ensure that we + * check whether the CheckXidAlive is aborted after fetching the tuple from + * system tables.  We also ensure that during logical decoding we never + * directly access the tableam or heap APIs because we are checking for the + * concurrent aborts only in systable_* APIs. + */ +TransactionId CheckXidAlive = InvalidTransactionId; +bool bsysscan = false; + /* * These are updated by GetSnapshotData. We initialize them this way * for the convenience of TransactionIdIsInProgress: even in bootstrap diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 8c34935c34..9d890d3c4b 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -21,6 +21,7 @@ #include "access/sdir.h" #include "utils/guc.h" #include "utils/rel.h" +#include "utils/snapmgr.h" #include "utils/snapshot.h" @@ -903,6 +904,15 @@ static inline bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot) { slot->tts_tableOid = RelationGetRelid(sscan->rs_rd); + + /* + * We don't expect direct calls to table_scan_getnextslot with valid + * CheckXidAlive for catalog or regular tables. See detailed comments at + * snapmgr.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_scan_getnextslot call during logical decoding"); + return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot); } @@ -1015,6 +1025,13 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan, TupleTableSlot *slot, bool *call_again, bool *all_dead) { + /* + * We don't expect direct calls to table_index_fetch_tuple with valid + * CheckXidAlive for catalog or regular tables. See detailed comments at + * snapmgr.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_index_fetch_tuple call during logical decoding"); return scan->rel->rd_tableam->index_fetch_tuple(scan, tid, snapshot, slot, call_again, @@ -1054,6 +1071,14 @@ table_tuple_fetch_row_version(Relation rel, Snapshot snapshot, TupleTableSlot *slot) { + /* + * We don't expect direct calls to table_tuple_fetch_row_version with valid + * CheckXidAlive for catalog or regular tables. See detailed comments at + * snapmgr.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_tuple_fetch_row_version call during logical decoding"); + return rel->rd_tableam->tuple_fetch_row_version(rel, tid, snapshot, slot); } @@ -1710,6 +1735,14 @@ static inline bool table_scan_bitmap_next_block(TableScanDesc scan, struct TBMIterateResult *tbmres) { + /* + * We don't expect direct calls to table_scan_bitmap_next_block with valid + * CheckXidAlive for catalog or regular tables. See detailed comments at + * snapmgr.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_scan_bitmap_next_block call during logical decoding"); + return scan->rs_rd->rd_tableam->scan_bitmap_next_block(scan, tbmres); } @@ -1727,6 +1760,14 @@ table_scan_bitmap_next_tuple(TableScanDesc scan, struct TBMIterateResult *tbmres, TupleTableSlot *slot) { + /* + * We don't expect direct calls to table_scan_bitmap_next_tuple with valid + * CheckXidAlive for catalog or regular tables. See detailed comments at + * snapmgr.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_scan_bitmap_next_tuple call during logical decoding"); + return scan->rs_rd->rd_tableam->scan_bitmap_next_tuple(scan, tbmres, slot); @@ -1745,6 +1786,13 @@ static inline bool table_scan_sample_next_block(TableScanDesc scan, struct SampleScanState *scanstate) { + /* + * We don't expect direct calls to table_scan_sample_next_block with valid + * CheckXidAlive for catalog or regular tables. See detailed comments at + * snapmgr.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_scan_sample_next_block call during logical decoding"); return scan->rs_rd->rd_tableam->scan_sample_next_block(scan, scanstate); } @@ -1761,6 +1809,13 @@ table_scan_sample_next_tuple(TableScanDesc scan, struct SampleScanState *scanstate, TupleTableSlot *slot) { + /* + * We don't expect direct calls to table_scan_sample_next_tuple with valid + * CheckXidAlive for catalog or regular tables. See detailed comments at + * snapmgr.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_scan_sample_next_tuple call during logical decoding"); return scan->rs_rd->rd_tableam->scan_sample_next_tuple(scan, scanstate, slot); } diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index b28d13ce84..5af6df698b 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -145,6 +145,8 @@ extern bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot); /* Support for catalog timetravel for logical decoding */ struct HTAB; +extern TransactionId CheckXidAlive; +extern bool bsysscan; extern struct HTAB *HistoricSnapshotGetTupleCids(void); extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids); extern void TeardownHistoricSnapshot(bool is_error); -- 2.23.0