From 6d2e8ba8253d81588ea1331bc5b70992780c0e89 Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Mon, 5 Sep 2016 15:30:53 +0800 Subject: [PATCH 09/10] Logical decoding on standby * Make walsender aware of ProcSignal and recovery conflicts, make walsender exit with recovery conflict on upstream drop database when it has an active logical slot on that database. * Allow GetOldestXmin to omit catalog_xmin, be called already locked. * Send catalog_xmin separately in hot_standby_feedback messages. * Store catalog_xmin separately on a physical slot if received in hot_standby_feedback * Separate the catalog_xmin used by vacuum from ProcArray's replication_slot_catalog_xmin, requiring that xlog be emitted before vacuum can remove no longer needed catalogs, store it in checkpoints, make vacuum and bgwriter advance it. * During decoding startup check whether catalog_xmin requirement can be satisfied and bail out if it can not * Add a new recovery conflict type for conflict with catalog_xmin. Abort in-progress logical decoding sessions with conflict with recovery where needed catalog_xmin is too old * Make extra efforts to reserve master's catalog_xmin during decoding startup on standby. * Try to make sure hot_standby_feedback is active when starting logical decoding. * Remove checks preventing starting logical decoding on standby --- contrib/pg_visibility/pg_visibility.c | 4 +- contrib/pgstattuple/pgstatapprox.c | 2 +- doc/src/sgml/protocol.sgml | 33 +- src/backend/access/heap/heapam.c | 2 +- src/backend/access/heap/rewriteheap.c | 3 +- src/backend/access/rmgrdesc/xactdesc.c | 9 + src/backend/access/transam/varsup.c | 15 + src/backend/access/transam/xact.c | 55 +++ src/backend/access/transam/xlog.c | 26 +- src/backend/catalog/index.c | 2 +- src/backend/commands/analyze.c | 2 +- src/backend/commands/dbcommands.c | 6 + src/backend/commands/vacuum.c | 13 +- src/backend/postmaster/bgwriter.c | 9 + src/backend/postmaster/pgstat.c | 2 + src/backend/replication/logical/decode.c | 11 + src/backend/replication/logical/logical.c | 323 ++++++++++++++- src/backend/replication/slot.c | 91 ++++- src/backend/replication/walreceiver.c | 52 ++- src/backend/replication/walsender.c | 135 ++++-- src/backend/storage/ipc/procarray.c | 201 +++++++-- src/backend/storage/ipc/procsignal.c | 3 + src/backend/storage/ipc/standby.c | 147 ++++++- src/backend/tcop/postgres.c | 38 +- src/bin/pg_controldata/pg_controldata.c | 2 + src/include/access/transam.h | 5 + src/include/access/xact.h | 12 +- src/include/catalog/pg_control.h | 1 + src/include/pgstat.h | 3 +- src/include/replication/slot.h | 1 + src/include/replication/walreceiver.h | 3 + src/include/storage/procarray.h | 9 +- src/include/storage/procsignal.h | 1 + src/include/storage/standby.h | 2 + .../recovery/t/010_logical_decoding_on_replica.pl | 454 +++++++++++++++++++++ 35 files changed, 1548 insertions(+), 129 deletions(-) create mode 100644 src/test/recovery/t/010_logical_decoding_on_replica.pl diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c index 9985e3e..4fa3ad4 100644 --- a/contrib/pg_visibility/pg_visibility.c +++ b/contrib/pg_visibility/pg_visibility.c @@ -538,7 +538,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen) if (all_visible) { /* Don't pass rel; that will fail in recovery. */ - OldestXmin = GetOldestXmin(NULL, true); + OldestXmin = GetOldestXmin(NULL, true, false); } rel = relation_open(relid, AccessShareLock); @@ -660,7 +660,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen) * a buffer lock. And this shouldn't happen often, so it's * worth being careful so as to avoid false positives. */ - RecomputedOldestXmin = GetOldestXmin(NULL, true); + RecomputedOldestXmin = GetOldestXmin(NULL, true, false); if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin)) record_corrupt_item(items, &tuple.t_self); diff --git a/contrib/pgstattuple/pgstatapprox.c b/contrib/pgstattuple/pgstatapprox.c index f524fc4..5b33c97 100644 --- a/contrib/pgstattuple/pgstatapprox.c +++ b/contrib/pgstattuple/pgstatapprox.c @@ -70,7 +70,7 @@ statapprox_heap(Relation rel, output_type *stat) TransactionId OldestXmin; uint64 misc_count = 0; - OldestXmin = GetOldestXmin(rel, true); + OldestXmin = GetOldestXmin(rel, true, false); bstrategy = GetAccessStrategy(BAS_BULKREAD); nblocks = RelationGetNumberOfBlocks(rel); diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index e41c650..3b8f06f 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1807,10 +1807,11 @@ The commands accepted in walsender mode are: - The standby's current xmin. This may be 0, if the standby is - sending notification that Hot Standby feedback will no longer - be sent on this connection. Later non-zero messages may - reinitiate the feedback mechanism. + The standby's current global xmin, excluding the catalog_xmin from any + replication slots. If both this value and the following + catalog_xmin are 0 this is treated as a notification that Hot Standby + feedback will no longer be sent on this connection. Later non-zero + messages may reinitiate the feedback mechanism. @@ -1820,7 +1821,29 @@ The commands accepted in walsender mode are: - The standby's current epoch. + The epoch of the global xmin xid on the standby. + + + + + + Int32 + + + + The lowest catalog_xmin of any replication slots on the standby. Set to 0 + if no catalog_xmin exists on the standby or if hot standby feedback is being + disabled. New in 10.0. + + + + + + Int32 + + + + The epoch of the catalog_xmin xid on the standby. New in 10.0. diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index ea579a0..d041e92 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -7300,7 +7300,7 @@ heap_tuple_needs_freeze(HeapTupleHeader tuple, TransactionId cutoff_xid, * ratchet forwards latestRemovedXid to the greatest one found. * This is used as the basis for generating Hot Standby conflicts, so * if a tuple was never visible then removing it should not conflict - * with queries. + * with queries or logical decoding output plugin callbacks. */ void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple, diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index 17584ba..c514b7b 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -810,7 +810,8 @@ logical_begin_heap_rewrite(RewriteState state) if (!state->rs_logical_rewrite) return; - ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin); + /* Use the catalog_xmin being retained by vacuum */ + ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin, NULL); /* * If there are no logical slots in progress we don't need to do anything, diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 91d27d0..f454d9d 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -297,6 +297,12 @@ xact_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "xtop %u: ", xlrec->xtop); xact_desc_assignment(buf, xlrec); } + else if (info == XLOG_XACT_CATALOG_XMIN_ADV) + { + xl_xact_catalog_xmin_advance *xlrec = (xl_xact_catalog_xmin_advance *) XLogRecGetData(record); + + appendStringInfo(buf, "catalog_xmin %u", xlrec->new_catalog_xmin); + } } const char * @@ -324,6 +330,9 @@ xact_identify(uint8 info) case XLOG_XACT_ASSIGNMENT: id = "ASSIGNMENT"; break; + case XLOG_XACT_CATALOG_XMIN_ADV: + id = "CATALOG_XMIN"; + break; } return id; diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 2f7e645..f786056 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -393,6 +393,21 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid) } } +/* + * Set the global oldest catalog_xmin used to determine when tuples + * may be removed from catalogs and user-catalogs accessible from logical + * decoding. + * + * Only to be called from the startup process or by UpdateOldestCatalogXmin(), + * which ensures the update is properly written to xlog first. + */ +void +SetOldestCatalogXmin(TransactionId oldestCatalogXmin) +{ + Assert(InRecovery || !IsUnderPostmaster || AmStartupProcess() || LWLockHeldByMe(ProcArrayLock)); + elog(DEBUG1, "XXX advancing catalogXmin from %u to %u", ShmemVariableCache->oldestCatalogXmin, oldestCatalogXmin); + ShmemVariableCache->oldestCatalogXmin = oldestCatalogXmin; +} /* * ForceTransactionIdLimitUpdate -- does the XID wrap-limit data need updating? diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index e47fd44..73f5fc0 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5641,6 +5641,61 @@ xact_redo(XLogReaderState *record) ProcArrayApplyXidAssignment(xlrec->xtop, xlrec->nsubxacts, xlrec->xsub); } + else if (info == XLOG_XACT_CATALOG_XMIN_ADV) + { + xl_xact_catalog_xmin_advance *xlrec = (xl_xact_catalog_xmin_advance *) XLogRecGetData(record); + + /* + * Unless logical decoding is possible on this node, we don't care about + * this record. + */ + if (!XLogLogicalInfoActive() || max_replication_slots == 0) + return; + + /* + * Apply the new catalog_xmin limit immediately. New decoding sessions + * will refuse to start if their slot is past it, and old ones will + * notice when we signal them with a recovery conflict. There's no + * effect on the catalogs themselves yet, so it's safe for backends + * with older catalog_xmins to still exist. + * + * We don't have to take ProcArrayLock since only the startup process + * is allowed to change oldestCatalogXmin when we're in recovery. + */ + SetOldestCatalogXmin(xlrec->new_catalog_xmin); + + /* + * Notify any active logical decoding sessions to terminate if they + * need the catalogs we're going to be allowed to remove after + * replaying this record. + */ + ResolveRecoveryConflictWithLogicalDecoding(xlrec->new_catalog_xmin); + } else elog(PANIC, "xact_redo: unknown op code %u", info); } + +/* + * Record when we advance the catalog_xmin used for tuple removal + * so standbys find out before we remove catalog tuples they might + * need for logical decoding. + */ +XLogRecPtr +XactLogCatalogXminUpdate(TransactionId new_catalog_xmin) +{ + XLogRecPtr ptr = InvalidXLogRecPtr; + + if (XLogInsertAllowed()) + { + xl_xact_catalog_xmin_advance xlrec; + + xlrec.new_catalog_xmin = new_catalog_xmin; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfXactCatalogXminAdvance); + + ptr = XLogInsert(RM_XACT_ID, XLOG_XACT_CATALOG_XMIN_ADV); + } + + return ptr; +} diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f8ffa5c..4c39a36 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -4840,6 +4840,7 @@ BootStrapXLOG(void) checkPoint.nextMultiOffset = 0; checkPoint.oldestXid = FirstNormalTransactionId; checkPoint.oldestXidDB = TemplateDbOid; + checkPoint.oldestCatalogXmin = InvalidTransactionId; checkPoint.oldestMulti = FirstMultiXactId; checkPoint.oldestMultiDB = TemplateDbOid; checkPoint.oldestCommitTsXid = InvalidTransactionId; @@ -4852,6 +4853,7 @@ BootStrapXLOG(void) ShmemVariableCache->oidCount = 0; MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB); SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId); @@ -6430,6 +6432,9 @@ StartupXLOG(void) (errmsg_internal("oldest unfrozen transaction ID: %u, in database %u", checkPoint.oldestXid, checkPoint.oldestXidDB))); ereport(DEBUG1, + (errmsg_internal("oldest catalog-only transaction ID: %u", + checkPoint.oldestCatalogXmin))); + ereport(DEBUG1, (errmsg_internal("oldest MultiXactId: %u, in database %u", checkPoint.oldestMulti, checkPoint.oldestMultiDB))); ereport(DEBUG1, @@ -6446,6 +6451,7 @@ StartupXLOG(void) ShmemVariableCache->oidCount = 0; MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB); SetCommitTsLimit(checkPoint.oldestCommitTsXid, checkPoint.newestCommitTsXid); @@ -8506,6 +8512,7 @@ CreateCheckPoint(int flags) checkPoint.nextXid = ShmemVariableCache->nextXid; checkPoint.oldestXid = ShmemVariableCache->oldestXid; checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB; + checkPoint.oldestCatalogXmin = ShmemVariableCache->oldestCatalogXmin; LWLockRelease(XidGenLock); LWLockAcquire(CommitTsLock, LW_SHARED); @@ -8709,7 +8716,7 @@ CreateCheckPoint(int flags) * StartupSUBTRANS hasn't been called yet. */ if (!RecoveryInProgress()) - TruncateSUBTRANS(GetOldestXmin(NULL, false)); + TruncateSUBTRANS(GetOldestXmin(NULL, false, false)); /* Real work is done, but log and update stats before releasing lock. */ LogCheckpointEnd(false); @@ -9072,7 +9079,7 @@ CreateRestartPoint(int flags) * this because StartupSUBTRANS hasn't been called yet. */ if (EnableHotStandby) - TruncateSUBTRANS(GetOldestXmin(NULL, false)); + TruncateSUBTRANS(GetOldestXmin(NULL, false, false)); /* Real work is done, but log and update before releasing lock. */ LogCheckpointEnd(true); @@ -9263,6 +9270,16 @@ XLogReportParameters(void) XLogFlush(recptr); } + /* + * If wal_level was lowered from WAL_LEVEL_LOGICAL we no longer + * require oldestCatalogXmin in checkpoints and it no longer + * makes sense, so update shmem and xlog the change. This will + * get written out in the next checkpoint. + */ + if (ControlFile->wal_level >= WAL_LEVEL_LOGICAL && + wal_level < WAL_LEVEL_LOGICAL) + UpdateOldestCatalogXmin(true); + ControlFile->MaxConnections = MaxConnections; ControlFile->max_worker_processes = max_worker_processes; ControlFile->max_prepared_xacts = max_prepared_xacts; @@ -9431,6 +9448,7 @@ xlog_redo(XLogReaderState *record) MultiXactAdvanceOldest(checkPoint.oldestMulti, checkPoint.oldestMultiDB); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); /* * If we see a shutdown checkpoint while waiting for an end-of-backup @@ -9529,8 +9547,8 @@ xlog_redo(XLogReaderState *record) checkPoint.oldestMultiDB); if (TransactionIdPrecedes(ShmemVariableCache->oldestXid, checkPoint.oldestXid)) - SetTransactionIdLimit(checkPoint.oldestXid, - checkPoint.oldestXidDB); + SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); /* ControlFile->checkPointCopy always tracks the latest ckpt XID */ ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch; ControlFile->checkPointCopy.nextXid = checkPoint.nextXid; diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 08b0989..6c08739 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -2272,7 +2272,7 @@ IndexBuildHeapRangeScan(Relation heapRelation, { snapshot = SnapshotAny; /* okay to ignore lazy VACUUMs here */ - OldestXmin = GetOldestXmin(heapRelation, true); + OldestXmin = GetOldestXmin(heapRelation, true, false); } scan = heap_beginscan_strat(heapRelation, /* relation */ diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c index f4afcd9..718ebba 100644 --- a/src/backend/commands/analyze.c +++ b/src/backend/commands/analyze.c @@ -993,7 +993,7 @@ acquire_sample_rows(Relation onerel, int elevel, totalblocks = RelationGetNumberOfBlocks(onerel); /* Need a cutoff xmin for HeapTupleSatisfiesVacuum */ - OldestXmin = GetOldestXmin(onerel, true); + OldestXmin = GetOldestXmin(onerel, true, false); /* Prepare for sampling block numbers */ BlockSampler_Init(&bs, totalblocks, targrows, random()); diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index 0919ad8..3efc833 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -2119,11 +2119,17 @@ dbase_redo(XLogReaderState *record) * InitPostgres() cannot fully re-execute concurrently. This * avoids backends re-connecting automatically to same database, * which can happen in some cases. + * + * This will lock out walsenders trying to connect to db-specific + * slots for logical decoding too, so it's safe for us to drop slots. */ LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock); ResolveRecoveryConflictWithDatabase(xlrec->db_id); } + /* Drop any database-specific replication slots */ + ReplicationSlotsDropDBSlots(xlrec->db_id); + /* Drop pages for this database that are in the shared buffer cache */ DropDatabaseBuffers(xlrec->db_id); diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index b1be2f7..f68673d 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -488,6 +488,15 @@ vacuum_set_xid_limits(Relation rel, MultiXactId safeMxactLimit; /* + * When logical decoding is enabled, we must write any advance of + * catalog_xmin to xlog before we allow VACUUM to remove those tuples. + * This ensures that any standbys doing logical decoding can cancel + * decoding sessions and invalidate slots if we remove tuples they + * still need. + */ + UpdateOldestCatalogXmin(false); + + /* * We can always ignore processes running lazy vacuum. This is because we * use these values only for deciding which tuples we must keep in the * tables. Since lazy vacuum doesn't write its XID anywhere, it's safe to @@ -497,7 +506,7 @@ vacuum_set_xid_limits(Relation rel, * always an independent transaction. */ *oldestXmin = - TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel); + TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true, false), rel); Assert(TransactionIdIsNormal(*oldestXmin)); @@ -909,7 +918,7 @@ vac_update_datfrozenxid(void) * committed pg_class entries for new tables; see AddNewRelationTuple(). * So we cannot produce a wrong minimum by starting with this. */ - newFrozenXid = GetOldestXmin(NULL, true); + newFrozenXid = GetOldestXmin(NULL, true, false); /* * Similarly, initialize the MultiXact "min" with the value that would be diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index 25020ab..6d49b0e 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -51,6 +51,7 @@ #include "storage/ipc.h" #include "storage/lwlock.h" #include "storage/proc.h" +#include "storage/procarray.h" #include "storage/shmem.h" #include "storage/smgr.h" #include "storage/spin.h" @@ -295,6 +296,14 @@ BackgroundWriterMain(void) } /* + * Eagerly advance the catalog_xmin used by vacuum if we're not + * a standby. This ensures that standbys waiting for catalog_xmin + * confirmation receive it promptly. + */ + if (!RecoveryInProgress()) + UpdateOldestCatalogXmin(false); + + /* * Log a new xl_running_xacts every now and then so replication can * get into a consistent state faster (think of suboverflowed * snapshots) and clean up resources (locks, KnownXids*) more diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 61e6a2c..92d3601 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3303,6 +3303,8 @@ pgstat_get_wait_activity(WaitEventActivity w) case WAIT_EVENT_WAL_WRITER_MAIN: event_name = "WalWriterMain"; break; + case WAIT_EVENT_STANDBY_LOGICAL_SLOT_CREATE: + event_name = "StandbyLogicalSlotCreate"; /* no default case, so that compiler will warn */ } diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 46cd5ba..5eaf42f 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -288,6 +288,17 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); break; + case XLOG_XACT_CATALOG_XMIN_ADV: + /* + * The global catalog_xmin has been advanced. By the time we see + * this in logical decoding it no longer matters, since it's + * guaranteed that all later records will be consistent with the + * advanced catalog_xmin, so we ignore it here. If we were running + * on a standby and it applied a catalog xmin advance past our + * needed catalog_xmin we would've already been terminated with a + * conflict with standby error. + */ + break; default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1512be5..9912800 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -29,6 +29,7 @@ #include "postgres.h" #include "miscadmin.h" +#include "pgstat.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -38,11 +39,14 @@ #include "replication/reorderbuffer.h" #include "replication/origin.h" #include "replication/snapbuild.h" +#include "replication/walreceiver.h" +#include "storage/ipc.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/memutils.h" +#include "utils/ps_status.h" /* data for errcontext callback */ typedef struct LogicalErrorCallbackState @@ -68,6 +72,10 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); +static void WaitForMasterCatalogXminReservation(ReplicationSlot *slot); + +static void EnsureActiveLogicalSlotValid(void); + /* * Make sure the current settings & environment are capable of doing logical * decoding. @@ -87,23 +95,53 @@ CheckLogicalDecodingRequirements(void) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); - /* ---- - * TODO: We got to change that someday soon... - * - * There's basically three things missing to allow this: - * 1) We need to be able to correctly and quickly identify the timeline a - * LSN belongs to - * 2) We need to force hot_standby_feedback to be enabled at all times so - * the primary cannot remove rows we need. - * 3) support dropping replication slots referring to a database, in - * dbase_redo. There can't be any active ones due to HS recovery - * conflicts, so that should be relatively easy. - * ---- - */ if (RecoveryInProgress()) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("logical decoding cannot be used while in recovery"))); + { + bool walrcv_running, walrcv_has_slot; + + SpinLockAcquire(&WalRcv->mutex); + walrcv_running = WalRcv->pid != 0; + walrcv_has_slot = WalRcv->slotname[0] != '\0'; + SpinLockRelease(&WalRcv->mutex); + + /* + * The walreceiver should be running when we try to create a slot. If + * we're unlucky enough to catch the walreceiver just as it's + * restarting after an error, well, the client can just retry. We don't + * bother to sleep and re-check. + */ + if (!walrcv_running) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("streaming replication is not active"), + errhint("Logical decoding on standby requires that streaming replication be configured and active. Ensure that primary_conninfo is correct in recovery.conf and check for streaming replication errors in the logs."))); + + /* + * When decoding on a standby we need a physical slot to be used by the + * walrececiver so we can pin the upstream's catalog_xmin down even + * over connection loss and restarts. This also gives us somewhere to + * record our needed catalog xmin on the master. + */ + if (!walrcv_has_slot) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("no replication slot configured for connection to master"), + errhint("Logical decoding on standby requires that a physical replication slot be used to connect the standby to the master."))); + + /* + * We need hot_standby_feedback to make sure the master doesn't vacuum + * away tuples we need. + * + * This check doesn't stop the user disabling it once we check, but they + * could also drop and re-create the physical replication slot without + * our noticing or do other silly things. Don't do that. If they do it + * anyway we'll notice and fail with conflict with recovery later. + */ + if (!hot_standby_feedback) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("hot_standby_feedback is not enabled"))); + } } /* @@ -126,6 +164,8 @@ StartupDecodingContext(List *output_plugin_options, /* shorter lines... */ slot = MyReplicationSlot; + EnsureActiveLogicalSlotValid(); + context = AllocSetContextCreate(CurrentMemoryContext, "Logical decoding context", ALLOCSET_DEFAULT_SIZES); @@ -266,7 +306,9 @@ CreateInitDecodingContext(char *plugin, * xmin horizons by other backends, get the safe decoding xid, and inform * the slot machinery about the new limit. Once that's done the * ProcArrayLock can be released as the slot machinery now is - * protecting against vacuum. + * protecting against vacuum - if we're on the master. If we're running on + * a replica, we have to wait until hot_standby_feedback locks in our + * needed catalogs, per details on WaitForMasterCatalogXminReservation(). * ---- */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); @@ -276,6 +318,12 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotsComputeRequiredXmin(true); + if (RecoveryInProgress()) + WaitForMasterCatalogXminReservation(slot); + + Assert(TransactionIdPrecedesOrEquals(ShmemVariableCache->oldestCatalogXmin, + slot->data.catalog_xmin)); + LWLockRelease(ProcArrayLock); /* @@ -963,3 +1011,244 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); } } + +/* + * Wait until the master's catalog_xmin is set, advancing our catalog_xmin + * if needed. Caller must hold exclusive ProcArrayLock, which this function will + * temporarily release while sleeping but will re-acquire. + * + * We're pretty much just hoping that, if someone else already has a + * catalog_xmin reservation affecting the master, it stays where we want it + * until our own hot_standby_feedback can pin it down. + * + * When we're creating a slot on a standby we can't directly set the + * master's catalog_xmin; the catalog_xmin is set locally, then relayed + * over hot_standby_feedback. The master may remove the catalogs we + * asked to reserve between when we set a local catalog_xmin and when + * hs feedback makes that take effect on the master. We need a feedback + * reply mechanism here, where: + * + * - we tentatively reserve catalog_xmin locally + * - we wake the walreceiver by setting its latch + * - walreceiver sends hs_feedback + * - upstream walsender sends a new 'hs_feedback reply' message with + * actual (xmin, catalog_xmin) reserved. + * - walreceiver sees reply and updates ShmemVariableCache or some other + * handy bit of shmem with hs feedback reservations from reply + * - we poll the reservations while we wait + * - we set our catalog_xmin to that value, which might be later if + * we missed our requested reservation, or might be earlier if + * someone else is holding down catalog_xmin on master. We got a hs + * feedback reply so we know it's reserved. + * + * For cascading, the actual reservation will need to cascade up the + * chain by walsender setting its own walreceiver's latch in turn, etc. + * + * For now, we just set the local slot catalog_xmin and sleep until + * oldestCatalogXmin equals or passes our reservation. This is fine if we're + * the only decoding session, but it is vulnerable to races if slots on the + * master or other decoding sessions on other standbys connected to the same + * master exist. They might advance their reservation before our hs_feedback + * locks it down, allowing vacuum to remove tuples we need. So we might start + * decoding on our slot then error with a conflict with recovery when we see + * catalog_xmin advance. + */ +static void +WaitForMasterCatalogXminReservation(ReplicationSlot *slot) +{ + TimestampTz waitStart; + char *new_status; + XLogRecPtr firstWaitWalEnd, lastWaitWalEnd; + + Assert(LWLockHeldByMe(ProcArrayLock)); + Assert(TransactionIdIsValid(slot->effective_catalog_xmin)); + Assert(slot->effective_catalog_xmin == slot->data.catalog_xmin); + + waitStart = GetCurrentTimestamp(); + new_status = NULL; /* we haven't changed the ps display */ + + /* + * The master doesn't reply to hot standby feedback explicitly, + * identify which message is the most recent, nor does it report + * the catalog_xmin reserved. + * + * This leaves a potential race. If catalog_xmin is already pinned down by + * some other slot on the master or another standby, + * ShmemVariableCache->oldestCatalogXmin will be set by it. We don't know + * if our hot standby feedback is in effect and pinning down catalog_xmin + * yet. If we start at the current oldestCatalogXmin the other slot might + * advance and allow vacuum to remove tuples we need before our hot standby + * feedback can lock it in. This may result in a conflict with standby at + * some point after we create the slot and start decoding, when we see the + * new xl_xact_catalog_xmin_advance record, unless our own catalog_xmin has + * advanced enough by then that we no longer need the removed catalogs. + * That can only happen if the xact holding down catalog_xmin has committed + * by the time the needed catalogs are removed, so we can decode it, + * advance confirmed_flush_lsn, and advance restart_lsn + catalog_xmin. + * + * To reduce the chances of triggering this race we force immediate + * hot_standby_feedback, wait for a new latestWalEnd report from the + * sender, and wait until we replay past that before we take the + * catalog_xmin to start from. Without the ability to ask the walsender + * to verify receipt of, and successful reservation of, a specific hot + * standby feedback message this is the best we can do. + * + * If we lose the race, decoding will fail with a recovery conflict later. + * The client will have to drop the slot and try again. + * + * Users can further mitigate this risk with a sufficiently high + * vacuum_defer_cleanup_age. + * + * Users can completely prevent this problem by creating a temporary + * logical slot on the master and waiting for the replica to catch up to + * the master's xlog insert position before they create a slot on the + * replica. Then wait until a catalog_xmin is reported on the replica's + * physical slot before dropping the temporary slot on the master. + * + * TODO: get reply from server explicitly confirming that it has applied + * our hs_feedback and what the lowest catalog_xmin it can honour is. + * We'll need some kind of cookie so we can tell the server is replying + * to us not someone else, especially in cascading setups. + */ + + firstWaitWalEnd = lastWaitWalEnd = WalRcv->latestWalEnd; + + WalRcvForceReply(); + + while (lastWaitWalEnd == firstWaitWalEnd || + GetXLogReplayRecPtr(NULL) < lastWaitWalEnd || + !TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin)) + { + int ret; + XLogRecPtr ptr = GetXLogReplayRecPtr(NULL); + + elog(DEBUG1, "XXX firstEnd %X/%X, lastEnd %X/%X; ptr %X/%X; oldestCatalogXmin %u", + (uint32)(firstWaitWalEnd>>32), (uint32)(firstWaitWalEnd), + (uint32)(lastWaitWalEnd>>32), (uint32)(lastWaitWalEnd), + (uint32)(ptr>>32), (uint32)(ptr), + ShmemVariableCache->oldestCatalogXmin); + + /* + * We need to advance our slot's catalog_xmin to keep pace with the + * latest reported position from the master. That way we won't get + * canceled with a recovery conflict when the master sends catalog_xmin + * updates while we're waiting for redo to catch up with the position + * we saw when we started waiting. + * + * A problem arises here when the server sends an + * xl_xact_catalog_xmin_advance with oldestCatalogXmin = 0, indicating + * it is no longer reserving catalogs. Since we're creating a slot we + * don't mind, but the redo code does not know that and will treat our + * process as conflicting with recovery. To guard against that we'll + * advance our oldestCatalogXmin to the new + * GetOldestSafeDecodingTransactionId() and redo will ignore slots + * whose catalog_xmin is >= nextXid. So long as we loop faster than the + * maximum standby delay we'll keep ahead of recovery cancellations. + * This means we must take XidGenLock once per loop, but it's not like + * we spend a lot of time creating slots. + * + * It's fine for our catalog_xmin to go backwards when the server + * reports it has nailed down catalog_xmin so we just uncondtionally + * reassign our catalog_xmin. + */ + slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId(); + slot->data.catalog_xmin = slot->effective_catalog_xmin; + ReplicationSlotsComputeRequiredXmin(true); + + LWLockRelease(ProcArrayLock); + + ret = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 500, WAIT_EVENT_STANDBY_LOGICAL_SLOT_CREATE); + + if (ret & WL_POSTMASTER_DEATH) + proc_exit(1); + + if (ret & WL_LATCH_SET) + ResetLatch(&MyProc->procLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Notice if the server has reported new WAL since we sent our feedback */ + if (lastWaitWalEnd == firstWaitWalEnd) + lastWaitWalEnd = WalRcv->latestWalEnd; + + /* Update process title if waiting long enough */ + if (update_process_title && new_status == NULL && + TimestampDifferenceExceeds(waitStart, GetCurrentTimestamp(), + 500)) + { + const char *old_status; + int len; + + old_status = get_ps_display(&len); + new_status = (char *) palloc(len + 8 + 1); + memcpy(new_status, old_status, len); + strcpy(new_status + len, " waiting"); + set_ps_display(new_status, false); + new_status[len] = '\0'; /* truncate off " waiting" */ + } + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + } + + if (TransactionIdPrecedes(slot->effective_catalog_xmin, ShmemVariableCache->oldestCatalogXmin)) + { + /* + * We didn't reserve the catalog_xmin we wanted, the master has already removed it. + * We have to start decoding at a later point. + */ + slot->effective_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; + slot->data.catalog_xmin = slot->effective_catalog_xmin; + } + + ReplicationSlotsComputeRequiredXmin(true); + + /* Tell the master what catalog_xmin we settled on */ + WalRcvForceReply(); + + /* Reset ps display if we changed it */ + if (new_status) + { + set_ps_display(new_status, false); + pfree(new_status); + } + + Assert(TransactionIdFollowsOrEquals(slot->effective_catalog_xmin, ShmemVariableCache->oldestCatalogXmin)); + Assert(LWLockHeldByMe(ProcArrayLock)); +} + +/* + * Test to see if the active logical slot is usable. + */ +static void +EnsureActiveLogicalSlotValid() +{ + Assert(MyReplicationSlot != NULL); + + /* + * Currently a logical can only become unusable if we're doing logical + * decoding on standby and the master advanced its catalog_xmin past + * the threshold we need, removing tuples that we'll require to start + * decoding at our restart_lsn. + */ + if (RecoveryInProgress()) + { + /* + * Check if enough catalog is retained for this slot. No locking is needed + * here since oldestCatalogXmin can only advance, so if it's past what + * we need that's not going to change. We have marked our slot as active + * so redo won't replay past our catalog_xmin without first terminating our + * session. + */ + TransactionId shmem_catalog_xmin = + *(volatile TransactionId*)(&ShmemVariableCache->oldestCatalogXmin); + + if (!TransactionIdIsValid(shmem_catalog_xmin) || + TransactionIdFollows(shmem_catalog_xmin, MyReplicationSlot->data.catalog_xmin)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot '%s' requires catalogs removed by master", + NameStr(MyReplicationSlot->data.name)))); + } +} diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index cf814d1..6ca2a00 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -795,6 +795,93 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive) return false; } +/* + * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the + * passed database oid. The caller should hold an exclusive lock on the database + * to ensure no replication slots on the database are in use. + * + * If we fail here we'll leave the in-memory state of replication slots + * inconsistent with its on-disk state, so we need to PANIC. + * + * This routine isn't as efficient as it could be - but we don't drop databases + * often, especially databases with lots of slots. + */ +void +ReplicationSlotsDropDBSlots(Oid dboid) +{ + int i; + + if (max_replication_slots <= 0) + return; + + /* + * We only need a shared lock here even though we activate slots, + * because we have an exclusive lock on the database we're dropping + * slots on and don't touch other databases' slots. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s; + NameData slotname; + int active_pid; + + s = &ReplicationSlotCtl->replication_slots[i]; + + /* cannot change while ReplicationSlotCtlLock is held */ + if (!s->in_use) + continue; + + /* only logical slots are database specific, skip */ + if (!SlotIsLogical(s)) + continue; + + /* not our database, skip */ + if (s->data.database != dboid) + continue; + + /* Claim the slot, as if ReplicationSlotAcquire()ing */ + SpinLockAcquire(&s->mutex); + strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN); + NameStr(slotname)[NAMEDATALEN-1] = '\0'; + active_pid = s->active_pid; + if (active_pid == 0) + { + MyReplicationSlot = s; + s->active_pid = MyProcPid; + } + SpinLockRelease(&s->mutex); + + /* + * The caller should have an exclusive lock on the database so + * we'll never have any in-use slots, but just in case... + */ + if (active_pid) + elog(PANIC, "replication slot %s is in use by pid %d", + NameStr(slotname), active_pid); + + /* + * To avoid largely duplicating ReplicationSlotDropAcquired() or + * complicating it with already_locked flags for ProcArrayLock, + * ReplicationSlotControlLock and ReplicationSlotAllocationLock, we + * just release our ReplicationSlotControlLock to drop the slot. + * + * There's no race here: we acquired this slot, and no slot "behind" + * our scan can be created or become active with our target dboid due + * to our exclusive lock on the DB. + */ + LWLockRelease(ReplicationSlotControlLock); + ReplicationSlotDropAcquired(); + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + } + LWLockRelease(ReplicationSlotControlLock); + + /* recompute limits once after all slots are dropped */ + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); +} + /* * Check whether the server's configuration supports using replication @@ -842,7 +929,9 @@ ReplicationSlotReserveWal(void) /* * For logical slots log a standby snapshot and start logical decoding * at exactly that position. That allows the slot to start up more - * quickly. + * quickly. We can't do that on a standby; there we must wait for the + * bgwriter to get around to logging its periodic standby snapshot. + * (TODO: ask walreceiver to ask walsender to log it or ask bgworker to log it) * * That's not needed (or indeed helpful) for physical slots as they'll * start replay at the last logged checkpoint anyway. Instead return diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index cc3cf7d..43b33ef 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -498,9 +498,15 @@ WalReceiverMain(void) * feedback now. Make sure the flag is really set to * false in shared memory before sending the reply, so * we don't miss a new request for a reply. + * + * If logical decoding information is enabled, we also + * send immediate hot standby feedback so as to reduce + * the delay before our needed catalogs are locked in. */ walrcv->force_reply = false; pg_memory_barrier(); + if (XLogLogicalInfoActive()) + XLogWalRcvSendHSFeedback(true); XLogWalRcvSendReply(true, false); } } @@ -1164,8 +1170,8 @@ XLogWalRcvSendHSFeedback(bool immed) { TimestampTz now; TransactionId nextXid; - uint32 nextEpoch; - TransactionId xmin; + uint32 xmin_epoch, catalog_xmin_epoch; + TransactionId xmin, catalog_xmin; static TimestampTz sendTime = 0; static bool master_has_standby_xmin = false; @@ -1206,29 +1212,57 @@ XLogWalRcvSendHSFeedback(bool immed) * everything else has been checked. */ if (hot_standby_feedback) - xmin = GetOldestXmin(NULL, false); + { + /* + * Usually GetOldestXmin() would include the catalog_xmin in its + * calculations, but we don't want to hold upstream back from vacuuming + * normal user table tuples just because they're within the + * catalog_xmin horizon of logical replication slots on this standby. + * Instead we report the catalog_xmin to the upstream separately. + */ + xmin = GetOldestXmin(NULL, + false, /* don't ignore vacuum */ + true /* ignore catalog xmin */); + + /* + * The catalog_Xmin reported by GetOldestXmin is the effective + * catalog_xmin used by vacuum, as set by xl_xact_catalog_xmin_advance + * records from the master. Sending it back to the master would be + * circular and prevent its catalog_xmin ever advancing once set. + * We should only send the catalog_xmin we actually need for slots. + */ + ProcArrayGetReplicationSlotXmin(NULL, NULL, &catalog_xmin); + } else + { xmin = InvalidTransactionId; + catalog_xmin = InvalidTransactionId; + } /* * Get epoch and adjust if nextXid and oldestXmin are different sides of * the epoch boundary. */ - GetNextXidAndEpoch(&nextXid, &nextEpoch); + GetNextXidAndEpoch(&nextXid, &xmin_epoch); + catalog_xmin_epoch = xmin_epoch; if (nextXid < xmin) - nextEpoch--; + xmin_epoch --; + if (nextXid < catalog_xmin) + catalog_xmin_epoch --; - elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u", - xmin, nextEpoch); + elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u", + xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch); /* Construct the message and send it. */ resetStringInfo(&reply_message); pq_sendbyte(&reply_message, 'h'); pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); pq_sendint(&reply_message, xmin, 4); - pq_sendint(&reply_message, nextEpoch, 4); + pq_sendint(&reply_message, xmin_epoch, 4); + pq_sendint(&reply_message, catalog_xmin, 4); + pq_sendint(&reply_message, catalog_xmin_epoch, 4); walrcv_send(wrconn, reply_message.data, reply_message.len); - if (TransactionIdIsValid(xmin)) + if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin)) master_has_standby_xmin = true; else master_has_standby_xmin = false; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 46976ce..33e2c1b 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -188,7 +188,6 @@ static XLogRecPtr logical_startptr = InvalidXLogRecPtr; /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); -static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ @@ -217,6 +216,7 @@ static long WalSndComputeSleeptime(TimestampTz now); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); +static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static void XLogRead(char *buf, XLogRecPtr startptr, Size count); @@ -1556,6 +1556,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) * be energy wasted - the worst lost information can do here is give us * wrong information in a statistics view - we'll just potentially be more * conservative in removing files. + * + * We don't have to do any effective_xmin / effective_catalog_xmin testing + * here either, like for LogicalConfirmReceivedLocation. If we received + * the xmin and catalog_xmin from downstream replication slots we know they + * were already confirmed there, */ } @@ -1618,7 +1623,7 @@ ProcessStandbyReplyMessage(void) /* compute new replication slot xmin horizon if needed */ static void -PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin) +PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin) { bool changed = false; ReplicationSlot *slot = MyReplicationSlot; @@ -1639,6 +1644,22 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin) slot->data.xmin = feedbackXmin; slot->effective_xmin = feedbackXmin; } + /* + * If the physical slot is relaying catalog_xmin for logical replication + * slots on the replica it's safe to act on catalog_xmin advances + * immediately too. The replica will only send a new catalog_xmin via + * feedback when it advances its effective_catalog_xmin, so it's done the + * delay-until-confirmed dance for us and knows it won't need the data + * we're protecting from vacuum again. + */ + if (!TransactionIdIsNormal(slot->data.catalog_xmin) || + !TransactionIdIsNormal(feedbackCatalogXmin) || + TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin)) + { + changed = true; + slot->data.catalog_xmin = feedbackCatalogXmin; + slot->effective_catalog_xmin = feedbackCatalogXmin; + } SpinLockRelease(&slot->mutex); if (changed) @@ -1649,59 +1670,92 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin) } /* + * Check that the provided xmin/epoch are sane, that is, not in the future + * and not so far back as to be already wrapped around. + * + * Epoch of nextXid should be same as standby, or if the counter has + * wrapped, then one greater than standby. + * + * This check doesn't care about whether clog exists for these xids + * at all. + */ +static bool +TransactionIdInRecentPast(TransactionId xid, uint32 epoch) +{ + TransactionId nextXid; + uint32 nextEpoch; + + GetNextXidAndEpoch(&nextXid, &nextEpoch); + + if (xid <= nextXid) + { + if (epoch != nextEpoch) + return false; + } + else + { + if (epoch + 1 != nextEpoch) + return false; + } + + if (!TransactionIdPrecedesOrEquals(xid, nextXid)) + return false; /* epoch OK, but it's wrapped around */ + + return true; +} + +/* * Hot Standby feedback */ static void ProcessStandbyHSFeedbackMessage(void) { - TransactionId nextXid; - uint32 nextEpoch; TransactionId feedbackXmin; uint32 feedbackEpoch; + TransactionId feedbackCatalogXmin; + uint32 feedbackCatalogEpoch; /* * Decipher the reply message. The caller already consumed the msgtype - * byte. + * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation + * of this message. */ (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ feedbackXmin = pq_getmsgint(&reply_message, 4); feedbackEpoch = pq_getmsgint(&reply_message, 4); + feedbackCatalogXmin = pq_getmsgint(&reply_message, 4); + feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4); - elog(DEBUG2, "hot standby feedback xmin %u epoch %u", + elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u", feedbackXmin, - feedbackEpoch); + feedbackEpoch, + feedbackCatalogXmin, + feedbackCatalogEpoch); - /* Unset WalSender's xmin if the feedback message value is invalid */ - if (!TransactionIdIsNormal(feedbackXmin)) + /* + * Unset WalSender's xmins if the feedback message values are invalid. + * This happens when the downstream turned hot_standby_feedback off. + */ + if (!TransactionIdIsNormal(feedbackXmin) + && !TransactionIdIsNormal(feedbackCatalogXmin)) { MyPgXact->xmin = InvalidTransactionId; if (MyReplicationSlot != NULL) - PhysicalReplicationSlotNewXmin(feedbackXmin); + PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin); return; } /* * Check that the provided xmin/epoch are sane, that is, not in the future * and not so far back as to be already wrapped around. Ignore if not. - * - * Epoch of nextXid should be same as standby, or if the counter has - * wrapped, then one greater than standby. */ - GetNextXidAndEpoch(&nextXid, &nextEpoch); - - if (feedbackXmin <= nextXid) - { - if (feedbackEpoch != nextEpoch) - return; - } - else - { - if (feedbackEpoch + 1 != nextEpoch) - return; - } + if (TransactionIdIsNormal(feedbackXmin) && + !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch)) + return; - if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid)) - return; /* epoch OK, but it's wrapped around */ + if (TransactionIdIsNormal(feedbackCatalogXmin) && + !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch)) + return; /* * Set the WalSender's xmin equal to the standby's requested xmin, so that @@ -1726,15 +1780,23 @@ ProcessStandbyHSFeedbackMessage(void) * already since a VACUUM could have just finished calling GetOldestXmin.) * * If we're using a replication slot we reserve the xmin via that, - * otherwise via the walsender's PGXACT entry. + * otherwise via the walsender's PGXACT entry. We can only track the + * catalog xmin separately when using a slot, so we store the least + * of the two provided when not using a slot. * * XXX: It might make sense to generalize the ephemeral slot concept and * always use the slot mechanism to handle the feedback xmin. */ if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */ - PhysicalReplicationSlotNewXmin(feedbackXmin); + PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin); else - MyPgXact->xmin = feedbackXmin; + { + if (TransactionIdIsNormal(feedbackCatalogXmin) + && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin)) + MyPgXact->xmin = feedbackCatalogXmin; + else + MyPgXact->xmin = feedbackXmin; + } } /* @@ -2607,17 +2669,6 @@ WalSndSigHupHandler(SIGNAL_ARGS) errno = save_errno; } -/* SIGUSR1: set flag to send WAL records */ -static void -WalSndXLogSendHandler(SIGNAL_ARGS) -{ - int save_errno = errno; - - latch_sigusr1_handler(); - - errno = save_errno; -} - /* SIGUSR2: set flag to do a last cycle and shut down afterwards */ static void WalSndLastCycleHandler(SIGNAL_ARGS) @@ -2651,7 +2702,7 @@ WalSndSignals(void) pqsignal(SIGQUIT, quickdie); /* hard crash time */ InitializeTimeouts(); /* establishes SIGALRM handler */ pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */ + pqsignal(SIGUSR1, procsignal_sigusr1_handler); pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and * shutdown */ diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 0f63755..e8b21e4 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -1291,17 +1291,22 @@ TransactionIdIsActive(TransactionId xid) * process can set its xmin based on transactions that are no longer running * in the master but are still being replayed on the standby, thus possibly * making the GetOldestXmin reading go backwards. In this case there is a - * possibility that we lose data that the standby would like to have, but - * there is little we can do about that --- data is only protected if the - * walsender runs continuously while queries are executed on the standby. - * (The Hot Standby code deals with such cases by failing standby queries - * that needed to access already-removed data, so there's no integrity bug.) + * possibility that we lose data that the standby would like to have + * unless the standby uses a replication slot to make its xmin persistent + * even when it isn't connected. The Hot Standby code deals with such cases by + * failing standby queries that needed to access already-removed data, so + * there's no integrity bug. + * * The return value is also adjusted with vacuum_defer_cleanup_age, so * increasing that setting on the fly is another easy way to make * GetOldestXmin() move backwards, with no consequences for data integrity. + * + * The caller may request that replication slots' catalog_xmin values be + * disregarded when calculating the global xmin. The caller must account + * for catalog_xmin separately. */ TransactionId -GetOldestXmin(Relation rel, bool ignoreVacuum) +GetOldestXmin(Relation rel, bool ignoreVacuum, bool ignoreCatalogXmin) { ProcArrayStruct *arrayP = procArray; TransactionId result; @@ -1375,9 +1380,13 @@ GetOldestXmin(Relation rel, bool ignoreVacuum) } } - /* fetch into volatile var while ProcArrayLock is held */ + /* + * Fetch slot xmins into volatile var while ProcArrayLock is held. Note that + * we're using the effective catalog_xmin for vacuum's tuple removal here, + * as copied over by UpdateOldestCatalogXmin(). + */ replication_slot_xmin = procArray->replication_slot_xmin; - replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; + replication_slot_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; if (RecoveryInProgress()) { @@ -1426,19 +1435,93 @@ GetOldestXmin(Relation rel, bool ignoreVacuum) NormalTransactionIdPrecedes(replication_slot_xmin, result)) result = replication_slot_xmin; + if (!ignoreCatalogXmin && (rel == NULL || RelationIsAccessibleInLogicalDecoding(rel))) + { + /* + * After locks have been released and defer_cleanup_age has been applied, + * check whether we need to back up further to make logical decoding + * safe. We need to do so if we're computing the global limit (rel = + * NULL) or if the passed relation is a catalog relation of some kind. + */ + if (TransactionIdIsValid(replication_slot_catalog_xmin) && + NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result)) + result = replication_slot_catalog_xmin; + } + + return result; +} + +/* + * Return true if ShmemVariableCache->oldestCatalogXmin needs to be updated + * to reflect an advance in procArray->replication_slot_catalog_xmin or + * it becoming newly set or unset. + * + */ +static bool +CatalogXminNeedsUpdate(TransactionId vacuum_catalog_xmin, TransactionId slots_catalog_xmin) +{ + return (TransactionIdPrecedes(vacuum_catalog_xmin, slots_catalog_xmin) + || (TransactionIdIsValid(vacuum_catalog_xmin) != TransactionIdIsValid(slots_catalog_xmin))); +} + +/* + * If necessary, copy the current catalog_xmin needed by repliation slots to + * the effective catalog_xmin used for dead tuple removal. + * + * When logical decoding is enabled we write a WAL record before advancing the + * effective value so that standbys find out if catalog tuples they still need + * get removed, and can properly cancel decoding sessions and invalidate slots. + * + * The 'force' option is used when we're turning WAL_LEVEL_LOGICAL off + * and need to clear the shmem state, since we want to bypass the wal_level + * check and force xlog writing. + */ +void +UpdateOldestCatalogXmin(bool force) +{ + TransactionId vacuum_catalog_xmin; + TransactionId slots_catalog_xmin; + + /* + * If we're not recording logical decoding information, catalog_xmin + * must be unset and we don't need to do any work here. + * + * XXX TODO make sure we zero the checkpointed value when we turn logical decoding + * off, and check it during startup!! + */ + if (!XLogLogicalInfoActive() && !force) + { + Assert(!TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin)); + Assert(!TransactionIdIsValid(procArray->replication_slot_catalog_xmin)); + } + + Assert(XLogInsertAllowed()); + /* - * After locks have been released and defer_cleanup_age has been applied, - * check whether we need to back up further to make logical decoding - * possible. We need to do so if we're computing the global limit (rel = - * NULL) or if the passed relation is a catalog relation of some kind. + * Do an unlocked check first. This is obviously race-prone especially + * since replication_slot_catalog_xmin could be updated after we read + * oldestCatalogXmin. But it doesn't matter if we get wrong results here, + * it'll either cause us to take an unnecessary ProcArrayLock to recheck, + * or delay an update until the next vacuum run. */ - if ((rel == NULL || - RelationIsAccessibleInLogicalDecoding(rel)) && - TransactionIdIsValid(replication_slot_catalog_xmin) && - NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result)) - result = replication_slot_catalog_xmin; + vacuum_catalog_xmin = *((volatile TransactionId*)&ShmemVariableCache->oldestCatalogXmin); + slots_catalog_xmin = *((volatile TransactionId*)&procArray->replication_slot_catalog_xmin); - return result; + if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin) || force) + { + XactLogCatalogXminUpdate(slots_catalog_xmin); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + /* + * A concurrent updater could've changed these values so we need to re-check + * under ProcArrayLock before updating. + */ + vacuum_catalog_xmin = *((volatile TransactionId*)&ShmemVariableCache->oldestCatalogXmin); + slots_catalog_xmin = *((volatile TransactionId*)&procArray->replication_slot_catalog_xmin); + if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin)) + SetOldestCatalogXmin(slots_catalog_xmin); + LWLockRelease(ProcArrayLock); + } } /* @@ -2166,14 +2249,20 @@ GetOldestSafeDecodingTransactionId(void) oldestSafeXid = ShmemVariableCache->nextXid; /* - * If there's already a slot pegging the xmin horizon, we can start with - * that value, it's guaranteed to be safe since it's computed by this - * routine initially and has been enforced since. + * If there's already an effectiveCatalogXmin held down by vacuum + * it's definitely safe to start there, and it can't advance + * while we hold ProcArrayLock. */ - if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) && - TransactionIdPrecedes(procArray->replication_slot_catalog_xmin, + if (TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin) && + TransactionIdPrecedes(ShmemVariableCache->oldestCatalogXmin, oldestSafeXid)) - oldestSafeXid = procArray->replication_slot_catalog_xmin; + oldestSafeXid = ShmemVariableCache->oldestCatalogXmin; + + /* + * TODO: If we're on replica and using hot standby feedback to set catalog_xmin + * we should be able to directly check the value reserved by feedback via shmem + * from walreceiver, even if xlog replay hasn't passed that point yet. + */ /* * If we're not in recovery, we walk over the procarray and collect the @@ -2655,6 +2744,53 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode) } /* + * Notify a logical decoding session that it conflicts with a + * newly set catalog_xmin from the master. + */ +void +CancelLogicalDecodingSessionWithRecoveryConflict(pid_t session_pid) +{ + ProcArrayStruct *arrayP = procArray; + int index; + + /* + * We have to scan ProcArray to find the process and set a pending recovery + * conflict even though we know the pid. At least we can get the BackendId + * and void a ProcSignal scan later. + * + * The pid might've gone away, in which case we got the desired + * outcome anyway. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + + for (index = 0; index < arrayP->numProcs; index++) + { + int pgprocno = arrayP->pgprocnos[index]; + volatile PGPROC *proc = &allProcs[pgprocno]; + + if (proc->pid == session_pid) + { + VirtualTransactionId procvxid; + + GET_VXID_FROM_PGPROC(procvxid, *proc); + + proc->recoveryConflictPending = true; + + /* + * Kill the pid if it's still here. If not, that's what we + * wanted so ignore any errors. + */ + (void) SendProcSignal(session_pid, + PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN, procvxid.backendId); + + break; + } + } + + LWLockRelease(ProcArrayLock); +} + +/* * MinimumActiveBackends --- count backends (other than myself) that are * in active transactions. Return true if the count exceeds the * minimum threshold passed. This is used as a heuristic to decide if @@ -2929,18 +3065,29 @@ ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, * * Return the current slot xmin limits. That's useful to be able to remove * data that's older than those limits. + * + * For logical replication slots' catalog_xmin, we return both the effective + * catalog_xmin being used for tuple removal (retained catalog_xmin) and the + * catalog_xmin actually needed by replication slots (needed_catalog_xmin). + * retained_catalog_xmin should be older than needed_catalog_xmin but is not + * guaranteed to be if there are replication slots on a replica currently + * attempting to start up and reserve catalogs. */ void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, - TransactionId *catalog_xmin) + TransactionId *retained_catalog_xmin, + TransactionId *needed_catalog_xmin) { LWLockAcquire(ProcArrayLock, LW_SHARED); if (xmin != NULL) *xmin = procArray->replication_slot_xmin; - if (catalog_xmin != NULL) - *catalog_xmin = procArray->replication_slot_catalog_xmin; + if (retained_catalog_xmin != NULL) + *retained_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; + + if (needed_catalog_xmin != NULL) + *needed_catalog_xmin = procArray->replication_slot_catalog_xmin; LWLockRelease(ProcArrayLock); } diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index a3d6ac5..d17dba1 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -273,6 +273,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN)) + RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_TABLESPACE)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_TABLESPACE); diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 112fe07..8e3a3b7 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -29,6 +29,7 @@ #include "storage/procarray.h" #include "storage/sinvaladt.h" #include "storage/standby.h" +#include "replication/slot.h" #include "utils/ps_status.h" #include "utils/timeout.h" #include "utils/timestamp.h" @@ -152,7 +153,9 @@ GetStandbyLimitTime(void) static int standbyWait_us = STANDBY_INITIAL_WAIT_US; /* - * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs. + * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs and + * ResolveRecoveryConflictWithLogicalDecoding. + * * We wait here for a while then return. If we decide we can't wait any * more then we return true, if we can wait some more return false. */ @@ -1108,3 +1111,145 @@ LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs, nmsgs * sizeof(SharedInvalidationMessage)); XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS); } + +/* + * Scan to see if any clients are using replication slots that are below the + * new catalog_xmin theshold and sigal them to terminate with a recovery + * conflict. + * + * We already applied the new catalog_xmin record and updated the shmem + * catalog_xmin state, so new clients that try to use a replication slot + * whose on-disk catalog_xmin is below the new threshold will ERROR, and we + * don't have to guard against them here. + * + * Replay can only continue safely once every slot that needs the catalogs + * we're going to free for removal is gone. So if any conflicting sessions + * exist, wait for any standby conflict grace period then signal them to exit. + * + * The master might clear its reserved catalog_xmin if all upstream slots are + * removed or clear their feedback reservations, sending us + * InvalidTransactionId. If we're concurrently trying to create a new slot and + * reserve catalogs the InvalidXid reservation report might come in while we + * have a slot waiting for hs_feedback confirmation of its reservation. That + * would cause the waiting process to get canceled with a conflict with + * recovery here since its tentative reservation conflicts with the master's + * report of 'nothing reserved'. To allow it to continue to seek a startpoint + * we ignore slots whose catalog_xmin is >= nextXid, indicating that they're + * still looking for where to start. We'll sometimes notice a conflict but the + * slot will advance its catalog_xmin to a more recent nextXid and cease to + * conflict when we re-check. (The alternative is to track slots being created + * differently to slots actively decoding in shmem, which seems unnecessary. Or + * to separate the 'tentative catalog_xmin reservation' of a slot from its + * actual needed catalog_xmin.) + * + * We can't use ResolveRecoveryConflictWithVirtualXIDs() here because + * walsender-based logical decoding sessions won't have any virtualxid for much + * of their life and the end of their virtualxids doesn't mean the end of a + * potential conflict. It would also cancel too aggressively, since it cares + * about the backend's xmin and logical decoding only needs the catalog_xmin. + */ +void +ResolveRecoveryConflictWithLogicalDecoding(TransactionId new_catalog_xmin) +{ + int i; + + if (!InHotStandby) + /* nobody can be actively using logical slots */ + return; + + /* Already applied new limit, can't have replayed later one yet */ + Assert(ShmemVariableCache->oldestCatalogXmin == new_catalog_xmin); + + /* + * Find the first conflicting active slot and wait for it to be free, + * signalling it if necessary, then repeat until there are no more + * conflicts. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *slot; + pid_t active_pid; + + /* Reset standby wait back-off delay for each session waited for */ + standbyWait_us = STANDBY_INITIAL_WAIT_US; + + slot = &ReplicationSlotCtl->replication_slots[i]; + + /* + * Physical slots can have a catalog_xmin, but if we're an intermediate + * cascading standby all we do is pass the catalog_xmin up to our + * master and relay WAL down to the cascaded replica. Conflicts are the + * cascaded replica's problem. + */ + if (!(slot->in_use && SlotIsLogical(slot))) + continue; + + /* + * We only care about the effective_catalog_xmin of in-use logical slots. + * Inactive slots have the same effective and actual catalog_xmin, and + * we'll detect conflicts with those when an attempt is made to use + * them. Active slots' catalog_xmin can't go backwards unless they + * become inactive. + * + * We specifically ignore catalog_xmin reservations >= nextXid here to allow + * for slots still being created; see function comment. + */ + while (slot->in_use && slot->active_pid != 0 && + TransactionIdIsValid(slot->effective_catalog_xmin) && + (!TransactionIdIsValid(new_catalog_xmin) || + TransactionIdPrecedes(slot->effective_catalog_xmin, new_catalog_xmin)) && + TransactionIdPrecedes(slot->effective_catalog_xmin, ShmemVariableCache->nextXid)) + { + /* + * Wait for the conflicting session to exit, signalling it with + * a conflict if necessary. + * + * We'll sleep here, so release the replication slot control lock. No + * new conflicts can appear "behind" our scan of the replication_slots + * array because sessions check the oldestCatalogXmin on decoding + * startup. This lets the exiting backend clear the slot's its + * active_pid. + */ + active_pid = slot->active_pid; + LWLockRelease(ReplicationSlotControlLock); + + if (WaitExceedsMaxStandbyDelay()) + { + /* + * As a safeguard against signalling the wrong process in case of + * pid reassignment, check that the slot's active_pid hasn't been + * cleared or changed. Do an unlocked read here since the worst + * wrong outcome even in the case of garbage read is an extra + * sleep. If you get a new backend with the same pid in the + * same slot array position you have terrible luck, and it + * might get cancelled with a spurious conflict. + */ + if (active_pid != slot->active_pid) + continue; + + ereport(INFO, + (errmsg("terminating logical decoding session due to recovery conflict"), + errdetail("Pid %u requires catalog_xmin %u for replication slot '%s' but the master has removed catalogs up to xid %u.", + active_pid, slot->effective_catalog_xmin, + NameStr(slot->data.name), new_catalog_xmin))); + + CancelLogicalDecodingSessionWithRecoveryConflict(active_pid); + + /* + * Wait a little bit for it to die so that we avoid flooding + * an unresponsive backend when system is heavily loaded. + */ + pg_usleep(5000L); + } + + /* + * We need to re-acquire the lock before re-checking the slot or + * continuing the scan. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + } + + } + LWLockRelease(ReplicationSlotControlLock); +} diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index b179231..5cf92ac 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2262,6 +2262,9 @@ errdetail_recovery_conflict(void) case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: errdetail("User transaction caused buffer deadlock with recovery."); break; + case PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN: + errdetail("Logical replication slot requires catalog rows that will be removed."); + break; case PROCSIG_RECOVERY_CONFLICT_DATABASE: errdetail("User was connected to a database that must be dropped."); break; @@ -2698,8 +2701,12 @@ SigHupHandler(SIGNAL_ARGS) /* * RecoveryConflictInterrupt: out-of-line portion of recovery conflict * handling following receipt of SIGUSR1. Designed to be similar to die() - * and StatementCancelHandler(). Called only by a normal user backend - * that begins a transaction during recovery. + * and StatementCancelHandler(). + * + * Called by normal user backends running during recovery. Also used by the + * walsender to handle recovery conflicts with logical decoding, and by + * background workers that call CHECK_FOR_INTERRUPTS() and respect recovery + * conflicts. */ void RecoveryConflictInterrupt(ProcSignalReason reason) @@ -2781,6 +2788,7 @@ RecoveryConflictInterrupt(ProcSignalReason reason) /* Intentional drop through to session cancel */ + case PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN: case PROCSIG_RECOVERY_CONFLICT_DATABASE: RecoveryConflictPending = true; ProcDiePending = true; @@ -2795,12 +2803,13 @@ RecoveryConflictInterrupt(ProcSignalReason reason) Assert(RecoveryConflictPending && (QueryCancelPending || ProcDiePending)); /* - * All conflicts apart from database cause dynamic errors where the - * command or transaction can be retried at a later point with some - * potential for success. No need to reset this, since non-retryable - * conflict errors are currently FATAL. + * All conflicts apart from database and catalog_xmin cause dynamic + * errors where the command or transaction can be retried at a later + * point with some potential for success. No need to reset this, since + * non-retryable conflict errors are currently FATAL. */ - if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE) + if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE || + reason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN) RecoveryConflictRetryable = false; } @@ -2855,11 +2864,20 @@ ProcessInterrupts(void) } else if (RecoveryConflictPending) { - /* Currently there is only one non-retryable recovery conflict */ - Assert(RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_DATABASE); + int code; + + Assert(RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_DATABASE || + RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN); + + if (RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN) + /* XXX more appropriate error code? */ + code = ERRCODE_PROGRAM_LIMIT_EXCEEDED; + else + code = ERRCODE_DATABASE_DROPPED; + pgstat_report_recovery_conflict(RecoveryConflictReason); ereport(FATAL, - (errcode(ERRCODE_DATABASE_DROPPED), + (errcode(code), errmsg("terminating connection due to conflict with recovery"), errdetail_recovery_conflict())); } diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index 20077a6..3bad417 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -242,6 +242,8 @@ main(int argc, char *argv[]) ControlFile->checkPointCopy.oldestCommitTsXid); printf(_("Latest checkpoint's newestCommitTsXid:%u\n"), ControlFile->checkPointCopy.newestCommitTsXid); + printf(_("Latest checkpoint's oldestCatalogXmin:%u\n"), + ControlFile->checkPointCopy.oldestCatalogXmin); printf(_("Time of latest checkpoint: %s\n"), ckpttime_str); printf(_("Fake LSN counter for unlogged rels: %X/%X\n"), diff --git a/src/include/access/transam.h b/src/include/access/transam.h index 969eff9..50f68e8 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -134,6 +134,10 @@ typedef struct VariableCacheData */ TransactionId latestCompletedXid; /* newest XID that has committed or * aborted */ + + TransactionId oldestCatalogXmin; /* oldest xid where complete catalog state + * is guaranteed to still exist */ + } VariableCacheData; typedef VariableCacheData *VariableCache; @@ -173,6 +177,7 @@ extern TransactionId GetNewTransactionId(bool isSubXact); extern TransactionId ReadNewTransactionId(void); extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid); +extern void SetOldestCatalogXmin(TransactionId oldestCatalogXmin); extern bool ForceTransactionIdLimitUpdate(void); extern Oid GetNewObjectId(void); diff --git a/src/include/access/xact.h b/src/include/access/xact.h index a123d2a..17e4306 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -118,7 +118,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_ABORT_PREPARED 0x40 #define XLOG_XACT_ASSIGNMENT 0x50 -/* free opcode 0x60 */ +#define XLOG_XACT_CATALOG_XMIN_ADV 0x60 /* free opcode 0x70 */ /* mask for filtering opcodes out of xl_info */ @@ -167,6 +167,13 @@ typedef struct xl_xact_assignment #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub) +typedef struct xl_xact_catalog_xmin_advance +{ + TransactionId new_catalog_xmin; +} xl_xact_catalog_xmin_advance; + +#define SizeOfXactCatalogXminAdvance (offsetof(xl_xact_catalog_xmin_advance, new_catalog_xmin) + sizeof(TransactionId)) + /* * Commit and abort records can contain a lot of information. But a large * portion of the records won't need all possible pieces of information. So we @@ -370,6 +377,9 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, TransactionId twophase_xid); + +extern XLogRecPtr XactLogCatalogXminUpdate(TransactionId new_catalog_xmin); + extern void xact_redo(XLogReaderState *record); /* xactdesc.c */ diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 0bc41ab..df19adc 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -43,6 +43,7 @@ typedef struct CheckPoint MultiXactOffset nextMultiOffset; /* next free MultiXact offset */ TransactionId oldestXid; /* cluster-wide minimum datfrozenxid */ Oid oldestXidDB; /* database with minimum datfrozenxid */ + TransactionId oldestCatalogXmin; /* catalog retained after this xid */ MultiXactId oldestMulti; /* cluster-wide minimum datminmxid */ Oid oldestMultiDB; /* database with minimum datminmxid */ pg_time_t time; /* time stamp of checkpoint */ diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 282f8ae..515479d 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -745,7 +745,8 @@ typedef enum WAIT_EVENT_SYSLOGGER_MAIN, WAIT_EVENT_WAL_RECEIVER_MAIN, WAIT_EVENT_WAL_SENDER_MAIN, - WAIT_EVENT_WAL_WRITER_MAIN + WAIT_EVENT_WAL_WRITER_MAIN, + WAIT_EVENT_STANDBY_LOGICAL_SLOT_CREATE } WaitEventActivity; /* ---------- diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index b653e5c..5638130 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -177,6 +177,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); +extern void ReplicationSlotsDropDBSlots(Oid dboid); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 28dc1fc..1a771e7 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -116,6 +116,9 @@ typedef struct /* * force walreceiver reply? This doesn't need to be locked; memory * barriers for ordering are sufficient. + * + * If hot standby feedback is enabled, a hot standby feedback message + * will also be sent. */ bool force_reply; diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index dd37c0c..0592aff 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -53,7 +53,7 @@ extern RunningTransactions GetRunningTransactionData(void); extern bool TransactionIdIsInProgress(TransactionId xid); extern bool TransactionIdIsActive(TransactionId xid); -extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum); +extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum, bool ignoreCatalogXmin); extern TransactionId GetOldestActiveTransactionId(void); extern TransactionId GetOldestSafeDecodingTransactionId(void); @@ -78,6 +78,8 @@ extern int CountUserBackends(Oid roleid); extern bool CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared); +extern void CancelLogicalDecodingSessionWithRecoveryConflict(pid_t session_pid); + extern void XidCacheRemoveRunningXids(TransactionId xid, int nxids, const TransactionId *xids, TransactionId latestXid); @@ -86,6 +88,9 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked); extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, - TransactionId *catalog_xmin); + TransactionId *retained_catalog_xmin, + TransactionId *needed_catalog_xmin); + +extern void UpdateOldestCatalogXmin(bool force); #endif /* PROCARRAY_H */ diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index f67b982..8e37e29 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -40,6 +40,7 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, + PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN, NUM_PROCSIGNALS /* Must be last! */ } ProcSignalReason; diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index dcebf72..cc04186 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -34,6 +34,8 @@ extern void ResolveRecoveryConflictWithDatabase(Oid dbid); extern void ResolveRecoveryConflictWithLock(LOCKTAG locktag); extern void ResolveRecoveryConflictWithBufferPin(void); +extern void ResolveRecoveryConflictWithLogicalDecoding( + TransactionId new_catalog_xmin); extern void CheckRecoveryConflictDeadlock(void); extern void StandbyDeadLockHandler(void); extern void StandbyTimeoutHandler(void); diff --git a/src/test/recovery/t/010_logical_decoding_on_replica.pl b/src/test/recovery/t/010_logical_decoding_on_replica.pl new file mode 100644 index 0000000..3f57230 --- /dev/null +++ b/src/test/recovery/t/010_logical_decoding_on_replica.pl @@ -0,0 +1,454 @@ +# Demonstrate that logical can follow timeline switches. +# +# Test logical decoding on a standby. +# +use strict; +use warnings; +use 5.8.0; + +use PostgresNode; +use TestLib; +use Test::More tests => 63; +use RecursiveCopy; +use File::Copy; +use Time::HiRes; + +my ($stdin, $stdout, $stderr, $ret, $handle, $return); +my $backup_name; + +# Initialize master node +my $node_master = get_new_node('master'); +$node_master->init(allows_streaming => 1, has_archiving => 1); +$node_master->append_conf('postgresql.conf', q{ +wal_level = 'logical' +max_replication_slots = 4 +max_wal_senders = 4 +log_min_messages = 'debug2' +log_error_verbosity = verbose +hot_standby_feedback = on +# send status rapidly so we promptly advance xmin on master +wal_receiver_status_interval = 1 +# very promptly terminate conflicting backends +max_standby_streaming_delay = '2s' +}); +$node_master->dump_info; +$node_master->start; + +$node_master->psql('postgres', q[CREATE DATABASE testdb]); + +$node_master->safe_psql('testdb', q[SELECT * FROM pg_create_physical_replication_slot('decoding_standby');]); +$backup_name = 'b1'; +my $backup_dir = $node_master->backup_dir . "/" . $backup_name; +TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', $node_master->connstr('testdb'), '--xlog-method=stream', '--write-recovery-conf', '--slot=decoding_standby'); + +open(my $fh, "<", $backup_dir . "/recovery.conf") + or die "can't open recovery.conf"; + +my $found = 0; +while (my $line = <$fh>) +{ + chomp($line); + if ($line eq "primary_slot_name = 'decoding_standby'") + { + $found = 1; + last; + } +} +ok($found, "using physical slot for standby"); + +sub print_phys_xmin +{ + my $slot = $node_master->slot('decoding_standby'); + return ($slot->{'xmin'}, $slot->{'catalog_xmin'}); +} + +my ($xmin, $catalog_xmin) = print_phys_xmin(); +# without the catalog_xmin hot standby feedback patch, catalog_xmin is always null +# and xmin is the min(xmin, catalog_xmin) of all slots on the standby + anything else +# holding down xmin. +ok(!$xmin, "xmin null"); +ok(!$catalog_xmin, "catalog_xmin null"); + +my $node_replica = get_new_node('replica'); +$node_replica->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); + +$node_replica->start; + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert')); +sleep(2); # ensure walreceiver feedback sent + +($xmin, $catalog_xmin) = print_phys_xmin(); +ok($xmin, "xmin not null"); +ok(!$catalog_xmin, "catalog_xmin null"); + +# Create new slots on the replica, ignoring the ones on the master completely. +diag "creating slot standby_logical"; +my $start_time = [Time::HiRes::gettimeofday()]; +is($node_replica->psql('testdb', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]), + 0, 'logical slot creation on standby succeeded'); +diag sprintf("Creation took %.2d seconds", Time::HiRes::tv_interval($start_time)); + +sub print_logical_xmin +{ + my $slot = $node_replica->slot('standby_logical'); + return ($slot->{'xmin'}, $slot->{'catalog_xmin'}); +} + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert')); +sleep(2); # ensure walreceiver feedback sent + +($xmin, $catalog_xmin) = print_phys_xmin(); +isnt($xmin, '', "physical xmin not null"); +isnt($catalog_xmin, '', "physical catalog_xmin not null"); + +($xmin, $catalog_xmin) = print_logical_xmin(); +is($xmin, '', "logical xmin null"); +isnt($catalog_xmin, '', "logical catalog_xmin not null"); + +$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)'); +$node_master->safe_psql('testdb', q[INSERT INTO test_table(blah) values ('itworks')]); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert')); +sleep(2); # ensure walreceiver feedback sent + +($xmin, $catalog_xmin) = print_phys_xmin(); +isnt($xmin, '', "physical xmin not null"); +isnt($catalog_xmin, '', "physical catalog_xmin not null"); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert')); +sleep(2); # ensure walreceiver feedback sent + +($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($ret, 0, 'replay from slot succeeded'); +is($stdout, q{BEGIN +table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks' +COMMIT}, 'replay results match'); +is($stderr, '', 'stderr is empty'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert')); +sleep(2); # ensure walreceiver feedback sent + +my ($physical_xmin, $physical_catalog_xmin) = print_phys_xmin(); +isnt($physical_xmin, '', "physical xmin not null"); +isnt($physical_catalog_xmin, '', "physical catalog_xmin not null"); + +my ($logical_xmin, $logical_catalog_xmin) = print_logical_xmin(); +is($logical_xmin, '', "logical xmin null"); +isnt($logical_catalog_xmin, '', "logical catalog_xmin not null"); + +# Ok, do a pile of tx's and make sure xmin advances. +# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot, +# we hold down xmin. +$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_1();]); +for my $i (0 .. 2000) +{ + $node_master->safe_psql('testdb', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]); +} +$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_2();]); +$node_master->safe_psql('testdb', 'VACUUM'); + +my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); +cmp_ok($new_logical_catalog_xmin, "==", $logical_catalog_xmin, "logical slot catalog_xmin hasn't advanced before get_changes"); + +($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($ret, 0, 'replay of big series succeeded'); +isnt($stdout, '', 'replayed some rows'); + +($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); +is($new_logical_xmin, '', "logical xmin null"); +isnt($new_logical_catalog_xmin, '', "logical slot catalog_xmin not null"); +cmp_ok($new_logical_catalog_xmin, ">", $logical_catalog_xmin, "logical slot catalog_xmin advanced after get_changes"); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert')); +sleep(2); # ensure walreceiver feedback sent + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert')); +sleep(2); # ensure walreceiver feedback sent + +my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin(); +isnt($new_physical_xmin, '', "physical xmin not null"); +# hot standby feedback should advance phys catalog_xmin now the standby's slot +# doesn't hold it down as far. +isnt($new_physical_catalog_xmin, '', "physical catalog_xmin not null"); +cmp_ok($new_physical_catalog_xmin, ">", $physical_catalog_xmin, "physical catalog_xmin advanced"); + +cmp_ok($new_physical_catalog_xmin, "<=", $new_logical_catalog_xmin, 'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on'); + +######################################################### +# Upstream catalog retention +######################################################### + +sub test_catalog_xmin_retention() +{ + # First burn some xids on the master in another DB, so we push the master's + # nextXid ahead. + foreach my $i (1 .. 100) + { + $node_master->safe_psql('postgres', 'SELECT txid_current()'); + } + + # Force vacuum freeze on the master and ensure its oldestXmin doesn't advance + # past our needed xmin. The only way we have visibility into that is to force + # a checkpoint. + $node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'"); + foreach my $dbname ('template1', 'postgres', 'testdb', 'template0') + { + $node_master->safe_psql($dbname, 'VACUUM FREEZE'); + } + sleep(1); + $node_master->safe_psql('postgres', 'CHECKPOINT'); + IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout) + or die "pg_controldata failed with $?"; + my @checkpoint = split('\n', $stdout); + my ($oldestXid, $oldestCatalogXmin, $nextXid) = ('', '', ''); + foreach my $line (@checkpoint) + { + if ($line =~ qr/^Latest checkpoint's NextXID:\s+\d+:(\d+)/) + { + $nextXid = $1; + } + if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/) + { + $oldestXid = $1; + } + if ($line =~ qr/^Latest checkpoint's oldestCatalogXmin:\s*(\d+)/) + { + $oldestCatalogXmin = $1; + } + } + die 'no oldestXID found in checkpoint' unless $oldestXid; + + my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin(); + my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); + + diag "upstream oldestXid $oldestXid, oldestCatalogXmin $oldestCatalogXmin, nextXid $nextXid, phys slot catalog_xmin $new_physical_catalog_xmin, downstream catalog_xmin $new_logical_catalog_xmin"; + + $node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'"); + + return ($oldestXid, $oldestCatalogXmin); +} + +diag "Testing catalog_xmin retention with hs_feedback on"; +my ($oldestXid, $oldestCatalogXmin) = test_catalog_xmin_retention(); + +cmp_ok($oldestXid, "<=", $new_logical_catalog_xmin, 'upstream oldestXid not past downstream catalog_xmin with hs_feedback on'); +cmp_ok($oldestCatalogXmin, ">=", $oldestXid, "oldestCatalogXmin >= oldestXid"); +cmp_ok($oldestCatalogXmin, "<=", $new_logical_catalog_xmin,, "oldestCatalogXmin >= downstream catalog_xmin"); + +######################################################### +# Conflict with recovery: xmin cancels decoding session +######################################################### +# +# Start a transaction on the replica then perform work that should cause a +# recovery conflict with it. We'll check to make sure the client gets +# terminated with recovery conflict. +# +# Temporarily disable hs feedback so we can test recovery conflicts. +# It's fine to continue using a physical slot, the xmin should be +# cleared. We only check hot_standby_feedback when establishing +# an initial decoding session so this approach circumvents the safeguards +# in place and forces a conflict. +# +# We'll also create an unrelated table so we can drop it later, making +# sure there are catalog changes to replay. +$node_master->safe_psql('testdb', 'CREATE TABLE dummy_table(blah integer)'); + +# Start pg_recvlogical before we turn off hs_feedback so its slot's +# catalog_xmin is above the downstream's catalog_threshold when we start +# decoding. +$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'standby_logical', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); + +$node_replica->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off'); +$node_replica->reload; + +sleep(2); + +($xmin, $catalog_xmin) = print_phys_xmin(); +is($xmin, '', "physical xmin null after hs_feedback disabled"); +is($catalog_xmin, '', "physical catalog_xmin null after hs_feedback disabled"); + +# Burn a bunch of XIDs and make sure upstream catalog_xmin is past what we'll +# need here +($oldestXid, $oldestCatalogXmin) = test_catalog_xmin_retention(); +cmp_ok($oldestXid, ">", $new_logical_catalog_xmin, 'upstream oldestXid advanced past downstream catalog_xmin with hs_feedback off'); +cmp_ok($oldestCatalogXmin, "==", 0, "oldestCatalogXmin = InvalidTransactionId with hs_feedback off"); + +# Data-only changes, no effect on catalogs. We should replay them fine +# without a conflict, since they advance xmin but not catalog_xmin. +$node_master->safe_psql('testdb', 'DELETE FROM test_table'); +$node_master->safe_psql('testdb', 'VACUUM FULL test_table'); +$node_master->safe_psql('testdb', 'VACUUM;'); + +diag "waiting for catchup"; +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert')); + +diag "pumping"; +$handle->pump; +diag "pumped"; + +# If we change the catalogs, we'll get a conflict with recovery, but only +# if there's an active xact when decoding. Logical decoding +# doesn't keep a virtualxid while waiting for WAL, only when calling output +# plugins, so this won't work damn. +diag "dropping dummy_table"; +$node_master->safe_psql('testdb', 'DROP TABLE dummy_table;'); + +diag "waiting for catchup"; +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert')); +diag "caught up, waiting for client"; + +# client dies? +eval { + $handle->finish; +}; +$return = $?; +if ($return) { + is($return, 256, "pg_recvlogical terminated by server on recovery conflict"); + like($stderr, qr/terminating connection due to conflict with recovery/, 'recvlogical recovery conflict errmsg'); + like($stderr, qr/requires catalog rows that will be removed/, 'pg_recvlogical exited with catalog_xmin conflict'); +} +else +{ + fail("pg_recvlogical returned ok $return with stdout '$stdout', stderr '$stderr'"); +} + +##################################################################### +# Conflict with recovery: refuse to run without hot_standby_feedback +##################################################################### +# +# When hot_standby_feedback is off, new connections should fail. +# + +IPC::Run::run(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'standby_logical', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); +is($?, 256, 'pg_recvlogical failed to connect to slot while hot_standby_feedback off'); +like($stderr, qr/hot_standby_feedback/, 'recvlogical recovery conflict errmsg'); + +##################################################################### +# Conflict with recovery: catalog_xmin advance invalidates idle slot +##################################################################### +# +# The slot that pg_recvlogical was using before it was terminated +# should not accept new connections now, since its catalog_xmin +# is lower than the replica's threshold. Even once we re-enable +# hot_standby_feedback, the removed tuples won't somehow come back. +# + +$node_replica->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on'); +$node_replica->reload; +sleep(2); + +IPC::Run::run(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'standby_logical', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); +is($?, 256, 'pg_recvlogical failed to connect to slot with past catalog_xmin'); +like($stderr, qr/replication slot '.*' requires catalogs removed by master/, 'recvlogical recovery conflict errmsg'); + + +################################################## +# Drop slot +################################################## +# +is($node_replica->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on'); + +# Make sure slots on replicas are droppable, and properly clear the upstream's xmin +$node_replica->psql('testdb', q[SELECT pg_drop_replication_slot('standby_logical')]); + +is($node_replica->slot('standby_logical')->{'slot_type'}, '', 'slot on standby dropped manually'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert')); +sleep(2); # ensure walreceiver feedback sent + +($xmin, $catalog_xmin) = print_phys_xmin(); +is($catalog_xmin, '', "physical catalog_xmin null"); + + + + +################################################## +# Recovery: drop database drops idle slots +################################################## + +# Create a couple of slots on the DB to ensure they are dropped when we drop +# the DB on the upstream if they're on the right DB, or not dropped if on +# another DB. + +diag "Testing dropdb when downstream slot is not in-use"; +diag "creating slot dodropslot"; +$start_time = [Time::HiRes::gettimeofday()]; +$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-P', 'test_decoding', '-S', 'dodropslot', '--create-slot'], 'pg_recvlogical created dodropslot'); +diag sprintf("Creation took %.2d seconds", Time::HiRes::tv_interval($start_time)); +diag "creating slot otherslot"; +$start_time = [Time::HiRes::gettimeofday()]; +$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('postgres'), '-P', 'test_decoding', '-S', 'otherslot', '--create-slot'], 'pg_recvlogical created otherslot'); +diag sprintf("Creation took %.2d seconds", Time::HiRes::tv_interval($start_time)); + +is($node_replica->slot('dodropslot')->{'slot_type'}, 'logical', 'slot dodropslot on standby created'); +is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'slot otherslot on standby created'); + +# dropdb on the master to verify slots are dropped on standby +$node_master->safe_psql('postgres', q[DROP DATABASE testdb]); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert')); + +is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f', + 'database dropped on standby'); + +is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped'); +is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'otherslot on standby not dropped'); + + +################################################## +# Recovery: drop database drops in-use slots +################################################## + +# This time, have the slot in-use on the downstream DB when we drop it. +diag "Testing dropdb when downstream slot is in-use"; +$node_master->psql('postgres', q[CREATE DATABASE testdb2]); + +diag "creaitng slot dodropslot2"; +$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-P', 'test_decoding', '-S', 'dodropslot2', '--create-slot']); +is($node_replica->slot('dodropslot2')->{'slot_type'}, 'logical', 'slot dodropslot2 on standby created'); + +# make sure the slot is in use +diag "starting pg_recvlogical"; +$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-S', 'dodropslot2', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); +sleep(1); + +is($node_replica->slot('dodropslot2')->{'active'}, 't', 'slot on standby is active') + or BAIL_OUT("slot not active on standby, cannot continue. pg_recvlogical exited with '$stdout', '$stderr'"); + +diag "pg_recvlogical backend pid is " . $node_replica->slot('dodropslot2')->{'active_pid'}; + +# Master doesn't know the replica's slot is busy so dropdb should succeed +$node_master->safe_psql('postgres', q[DROP DATABASE testdb2]); +ok(1, 'dropdb finished'); + +while ($node_replica->slot('dodropslot2')->{'active_pid'}) +{ + sleep(1); + diag "waiting for walsender to exit"; +} + +diag "walsender exited, waiting for pg_recvlogical to exit"; + +# our client should've terminated in response to the walsender error +eval { + $handle->finish; +}; +$return = $?; +if ($return) { + is($return, 256, "pg_recvlogical terminated by server"); + like($stderr, qr/terminating connection due to conflict with recovery/, 'recvlogical recovery conflict'); + like($stderr, qr/User was connected to a database that must be dropped./, 'recvlogical recovery conflict db'); +} + +is($node_replica->slot('dodropslot2')->{'active_pid'}, '', 'walsender backend exited'); + +# The slot should be dropped by recovery now +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert')); + +is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb2')]), 'f', + 'database dropped on standby'); + +is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped'); -- 2.5.5