From e303c2f8706c7a54460ab66fd2d1d0196361a99a Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Wed, 22 Mar 2017 12:29:13 +0800 Subject: [PATCH 2/2] Report catalog_xmin separately to xmin in hot standby feedback The catalog_xmin of slots on a standby was reported as part of the standby's xmin, causing the master's xmin to be held down. This could cause considerable unnecessary bloat on the master. Instead, report catalog_xmin as a separate field in hot_standby_feedback. If the upstream walsender is using a physical replication slot, store the catalog_xmin in the slot's catalog_xmin field. If the upstream doesn't use a slot and has only a PGPROC entry behaviour doesn't change, as we store the combined xmin and catalog_xmin in the PGPROC entry. There's no backward compatibility concern here, as nothing except another postgres instance of the same major version has any business sending hot standby feedback and it's only used on the physical replication protocol. e Please enter the commit message for your changes. Lines starting --- doc/src/sgml/protocol.sgml | 33 ++++++- src/backend/replication/walreceiver.c | 43 ++++++-- src/backend/replication/walsender.c | 110 +++++++++++++++------ .../recovery/t/010_logical_decoding_timelines.pl | 38 ++++++- 4 files changed, 175 insertions(+), 49 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 244e381..d8786f0 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1911,10 +1911,11 @@ The commands accepted in walsender mode are: - The standby's current xmin. This may be 0, if the standby is - sending notification that Hot Standby feedback will no longer - be sent on this connection. Later non-zero messages may - reinitiate the feedback mechanism. + The standby's current global xmin, excluding the catalog_xmin from any + replication slots. If both this value and the following + catalog_xmin are 0 this is treated as a notification that Hot Standby + feedback will no longer be sent on this connection. Later non-zero + messages may reinitiate the feedback mechanism. @@ -1924,7 +1925,29 @@ The commands accepted in walsender mode are: - The standby's current epoch. + The epoch of the global xmin xid on the standby. + + + + + + Int32 + + + + The lowest catalog_xmin of any replication slots on the standby. Set to 0 + if no catalog_xmin exists on the standby or if hot standby feedback is being + disabled. + + + + + + Int32 + + + + The epoch of the catalog_xmin xid on the standby. diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b1ab8e0..60c1aba 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1175,8 +1175,8 @@ XLogWalRcvSendHSFeedback(bool immed) { TimestampTz now; TransactionId nextXid; - uint32 nextEpoch; - TransactionId xmin; + uint32 xmin_epoch, catalog_xmin_epoch; + TransactionId xmin, catalog_xmin; static TimestampTz sendTime = 0; /* initially true so we always send at least one feedback message */ static bool master_has_standby_xmin = true; @@ -1221,29 +1221,54 @@ XLogWalRcvSendHSFeedback(bool immed) * everything else has been checked. */ if (hot_standby_feedback) - xmin = GetOldestXmin(NULL, false, false); + { + /* + * Usually GetOldestXmin() would include the catalog_xmin in its + * calculations, but we don't want to hold upstream back from vacuuming + * normal user table tuples just because they're within the + * catalog_xmin horizon of logical replication slots on this standby. + * Instead we report the catalog_xmin to the upstream separately. + */ + xmin = GetOldestXmin(NULL, + false, /* don't ignore vacuum */ + true /* ignore catalog xmin */); + + /* + * Obtain catalog_xmin to send separately, so the walsender can store + * it on a physical slot's catalog_xmin if one is in use. + */ + ProcArrayGetReplicationSlotXmin(NULL, &catalog_xmin); + } else + { xmin = InvalidTransactionId; + catalog_xmin = InvalidTransactionId; + } /* * Get epoch and adjust if nextXid and oldestXmin are different sides of * the epoch boundary. */ - GetNextXidAndEpoch(&nextXid, &nextEpoch); + GetNextXidAndEpoch(&nextXid, &xmin_epoch); + catalog_xmin_epoch = xmin_epoch; if (nextXid < xmin) - nextEpoch--; + xmin_epoch --; + if (nextXid < catalog_xmin) + catalog_xmin_epoch --; - elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u", - xmin, nextEpoch); + elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u", + xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch); /* Construct the message and send it. */ resetStringInfo(&reply_message); pq_sendbyte(&reply_message, 'h'); pq_sendint64(&reply_message, GetCurrentTimestamp()); pq_sendint(&reply_message, xmin, 4); - pq_sendint(&reply_message, nextEpoch, 4); + pq_sendint(&reply_message, xmin_epoch, 4); + pq_sendint(&reply_message, catalog_xmin, 4); + pq_sendint(&reply_message, catalog_xmin_epoch, 4); walrcv_send(wrconn, reply_message.data, reply_message.len); - if (TransactionIdIsValid(xmin)) + if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin)) master_has_standby_xmin = true; else master_has_standby_xmin = false; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 7561770..05b51a0 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -221,6 +221,7 @@ static long WalSndComputeSleeptime(TimestampTz now); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); +static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static void XLogRead(char *buf, XLogRecPtr startptr, Size count); @@ -1605,7 +1606,7 @@ ProcessStandbyReplyMessage(void) /* compute new replication slot xmin horizon if needed */ static void -PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin) +PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin) { bool changed = false; ReplicationSlot *slot = MyReplicationSlot; @@ -1626,6 +1627,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin) slot->data.xmin = feedbackXmin; slot->effective_xmin = feedbackXmin; } + if (!TransactionIdIsNormal(slot->data.catalog_xmin) || + !TransactionIdIsNormal(feedbackCatalogXmin) || + TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin)) + { + changed = true; + slot->data.catalog_xmin = feedbackCatalogXmin; + slot->effective_catalog_xmin = feedbackCatalogXmin; + } SpinLockRelease(&slot->mutex); if (changed) @@ -1636,59 +1645,92 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin) } /* - * Hot Standby feedback + * Check that the provided xmin/epoch are sane, that is, not in the future + * and not so far back as to be already wrapped around. + * + * Epoch of nextXid should be same as standby, or if the counter has + * wrapped, then one greater than standby. + * + * This check doesn't care about whether clog exists for these xids + * at all. */ -static void -ProcessStandbyHSFeedbackMessage(void) +static bool +TransactionIdInRecentPast(TransactionId xid, uint32 epoch) { TransactionId nextXid; uint32 nextEpoch; + + GetNextXidAndEpoch(&nextXid, &nextEpoch); + + if (xid <= nextXid) + { + if (epoch != nextEpoch) + return false; + } + else + { + if (epoch + 1 != nextEpoch) + return false; + } + + if (!TransactionIdPrecedesOrEquals(xid, nextXid)) + return false; /* epoch OK, but it's wrapped around */ + + return true; +} + +/* + * Hot Standby feedback + */ +static void +ProcessStandbyHSFeedbackMessage(void) +{ TransactionId feedbackXmin; uint32 feedbackEpoch; + TransactionId feedbackCatalogXmin; + uint32 feedbackCatalogEpoch; /* * Decipher the reply message. The caller already consumed the msgtype - * byte. + * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation + * of this message. */ (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ feedbackXmin = pq_getmsgint(&reply_message, 4); feedbackEpoch = pq_getmsgint(&reply_message, 4); + feedbackCatalogXmin = pq_getmsgint(&reply_message, 4); + feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4); - elog(DEBUG2, "hot standby feedback xmin %u epoch %u", + elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u", feedbackXmin, - feedbackEpoch); + feedbackEpoch, + feedbackCatalogXmin, + feedbackCatalogEpoch); - /* Unset WalSender's xmin if the feedback message value is invalid */ - if (!TransactionIdIsNormal(feedbackXmin)) + /* + * Unset WalSender's xmins if the feedback message values are invalid. + * This happens when the downstream turned hot_standby_feedback off. + */ + if (!TransactionIdIsNormal(feedbackXmin) + && !TransactionIdIsNormal(feedbackCatalogXmin)) { MyPgXact->xmin = InvalidTransactionId; if (MyReplicationSlot != NULL) - PhysicalReplicationSlotNewXmin(feedbackXmin); + PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin); return; } /* * Check that the provided xmin/epoch are sane, that is, not in the future * and not so far back as to be already wrapped around. Ignore if not. - * - * Epoch of nextXid should be same as standby, or if the counter has - * wrapped, then one greater than standby. */ - GetNextXidAndEpoch(&nextXid, &nextEpoch); + if (TransactionIdIsNormal(feedbackXmin) && + !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch)) + return; - if (feedbackXmin <= nextXid) - { - if (feedbackEpoch != nextEpoch) - return; - } - else - { - if (feedbackEpoch + 1 != nextEpoch) - return; - } - - if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid)) - return; /* epoch OK, but it's wrapped around */ + if (TransactionIdIsNormal(feedbackCatalogXmin) && + !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch)) + return; /* * Set the WalSender's xmin equal to the standby's requested xmin, so that @@ -1713,15 +1755,23 @@ ProcessStandbyHSFeedbackMessage(void) * already since a VACUUM could have just finished calling GetOldestXmin.) * * If we're using a replication slot we reserve the xmin via that, - * otherwise via the walsender's PGXACT entry. + * otherwise via the walsender's PGXACT entry. We can only track the + * catalog xmin separately when using a slot, so we store the least + * of the two provided when not using a slot. * * XXX: It might make sense to generalize the ephemeral slot concept and * always use the slot mechanism to handle the feedback xmin. */ if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */ - PhysicalReplicationSlotNewXmin(feedbackXmin); + PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin); else - MyPgXact->xmin = feedbackXmin; + { + if (TransactionIdIsNormal(feedbackCatalogXmin) + && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin)) + MyPgXact->xmin = feedbackCatalogXmin; + else + MyPgXact->xmin = feedbackXmin; + } } /* diff --git a/src/test/recovery/t/010_logical_decoding_timelines.pl b/src/test/recovery/t/010_logical_decoding_timelines.pl index 09830dc..4561a06 100644 --- a/src/test/recovery/t/010_logical_decoding_timelines.pl +++ b/src/test/recovery/t/010_logical_decoding_timelines.pl @@ -20,7 +20,7 @@ use warnings; use PostgresNode; use TestLib; -use Test::More tests => 7; +use Test::More tests => 10; use RecursiveCopy; use File::Copy; use IPC::Run (); @@ -31,10 +31,14 @@ my ($stdout, $stderr, $ret); # Initialize master node my $node_master = get_new_node('master'); $node_master->init(allows_streaming => 1, has_archiving => 1); -$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n"); -$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n"); -$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n"); -$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n"); +$node_master->append_conf('postgresql.conf', q[ +wal_level = 'logical' +max_replication_slots = 3 +max_wal_senders = 2 +log_min_messages = 'debug2' +hot_standby_feedback = on +wal_receiver_status_interval = 1 +]); $node_master->dump_info; $node_master->start; @@ -51,11 +55,17 @@ $node_master->safe_psql('postgres', 'CHECKPOINT;'); my $backup_name = 'b1'; $node_master->backup_fs_hot($backup_name); +$node_master->safe_psql('postgres', + q[SELECT pg_create_physical_replication_slot('phys_slot');]); + my $node_replica = get_new_node('replica'); $node_replica->init_from_backup( $node_master, $backup_name, has_streaming => 1, has_restoring => 1); +$node_replica->append_conf( + 'recovery.conf', q[primary_slot_name = 'phys_slot']); + $node_replica->start; $node_master->safe_psql('postgres', @@ -71,6 +81,24 @@ $stdout = $node_replica->safe_psql('postgres', is($stdout, 'before_basebackup', 'Expected to find only slot before_basebackup on replica'); +# Examine the physical slot the replica uses to stream changes +# from the master to make sure its hot_standby_feedback +# has locked in a catalog_xmin on the physical slot, and that +# any xmin is < the catalog_xmin +$node_master->poll_query_until('postgres', q[ + SELECT catalog_xmin IS NOT NULL + FROM pg_replication_slots + WHERE slot_name = 'phys_slot' + ]); +my $phys_slot = $node_master->slot('phys_slot'); +isnt($phys_slot->{'xmin'}, '', + 'xmin assigned on physical slot of master'); +isnt($phys_slot->{'catalog_xmin'}, '', + 'catalog_xmin assigned on physical slot of master'); +# Ignore wrap-around here, we're on a new cluster: +cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'}, + 'xmin on physical slot must not be lower than catalog_xmin'); + # Boom, crash $node_master->stop('immediate'); -- 2.5.5