From df131729f4c7e86f5b441d0861d7195515939855 Mon Sep 17 00:00:00 2001 From: Wu Hao Date: Sun, 9 Aug 2020 10:53:58 +0530 Subject: [PATCH v2] Start WAL receiver before startup process replays existing WAL If WAL receiver is started only after startup process finishes replaying WAL already available in pg_wal, synchornous replication is impacted adversly. Consider a temporary network outage causing streaming replication connection to break. This leads to exit of WAL receiver process. If the startup process has fallen behind, it may take a long time to finish replaying WAL and then start walreceiver again to re-establish streaming replication. Commits on master will have to wait all this while for the standby to flush WAL upto commit LSN. This experience can be alleviated if replication connection is re-established as soon as it is found to be disconnected. The patch attempts to do so by starting WAL receiver as soon as consistent state is reached. The start point to request streaming from is set to the beginning of the most recently flushed WAL segment. To determine this, the startup process scans first page of segments, stating from the segment currently being read, one file at a time. A new GUC, wal_receiver_start_condition, controls the new behavior. When set to 'consistency', the new behavior takes effect. The default value is 'replay', which keeps the current behavior. A TAP test is added to demonstrate the problem and validate the fix. Discussion: https://www.postgresql.org/message-id/CANXE4TewY1WNgu5J5ek38RD%2B2m9F2K%3DfgbWubjv9yG0BeyFxRQ%40mail.gmail.com https://www.postgresql.org/message-id/b271715f-f945-35b0-d1f5-c9de3e56f65e@postgrespro.ru Co-authored-by: Asim R P --- src/backend/access/transam/xlog.c | 119 ++++++++++- src/backend/replication/walreceiver.c | 1 + src/backend/replication/walreceiverfuncs.c | 20 +- src/backend/utils/misc/guc.c | 17 ++ src/include/replication/walreceiver.h | 7 + src/test/recovery/t/018_replay_lag_syncrep.pl | 192 ++++++++++++++++++ 6 files changed, 345 insertions(+), 11 deletions(-) create mode 100644 src/test/recovery/t/018_replay_lag_syncrep.pl diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 756b838e6a5..6192f5be347 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5642,6 +5642,98 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog) (errmsg("archive recovery complete"))); } +static int +XLogReadFirstPage(XLogRecPtr targetPagePtr, char *readBuf) +{ + int fd; + XLogSegNo segno; + char xlogfname[MAXFNAMELEN]; + + XLByteToSeg(targetPagePtr, segno, wal_segment_size); + elog(DEBUG3, "reading first page of segment %lu", segno); + fd = XLogFileReadAnyTLI(segno, LOG, XLOG_FROM_PG_WAL); + if (fd == -1) + return -1; + + /* Seek to the beginning, we want to check if the first page is valid */ + if (lseek(fd, (off_t) 0, SEEK_SET) < 0) + { + XLogFileName(xlogfname, ThisTimeLineID, segno, wal_segment_size); + close(fd); + elog(ERROR, "could not seek XLOG file %s, segment %lu: %m", + xlogfname, segno); + } + + if (read(fd, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) + { + close(fd); + elog(ERROR, "could not read from XLOG file %s, segment %lu: %m", + xlogfname, segno); + } + + close(fd); + return XLOG_BLCKSZ; +} + +/* + * Find the LSN that points to the beginning of the segment file most recently + * flushed by WAL receiver. It will be used as start point by new instance of + * WAL receiver. + * + * The XLogReaderState abstraction is not suited for this purpose. The + * interface it offers is XLogReadRecord, which is not suited to read a + * specific page from WAL. + */ +static XLogRecPtr +GetLastLSN(XLogRecPtr lsn) +{ + XLogSegNo lastValidSegNo; + char readBuf[XLOG_BLCKSZ]; + + XLByteToSeg(lsn, lastValidSegNo, wal_segment_size); + /* + * We know that lsn falls in a valid segment. Start searching from the + * next segment. + */ + XLogSegNoOffsetToRecPtr(lastValidSegNo+1, 0, wal_segment_size, lsn); + + elog(LOG, "scanning WAL for last valid segment, starting from %X/%X", + (uint32) (lsn >> 32), (uint32) lsn); + + while (XLogReadFirstPage(lsn, readBuf) == XLOG_BLCKSZ) + { + /* + * Validate page header, it must be a long header because we are + * inspecting the first page in a segment file. The big if condition + * is modelled according to XLogReaderValidatePageHeader. + */ + XLogLongPageHeader longhdr = (XLogLongPageHeader) readBuf; + if ((longhdr->std.xlp_info & XLP_LONG_HEADER) == 0 || + (longhdr->std.xlp_magic != XLOG_PAGE_MAGIC) || + ((longhdr->std.xlp_info & ~XLP_ALL_FLAGS) != 0) || + (longhdr->xlp_sysid != ControlFile->system_identifier) || + (longhdr->xlp_seg_size != wal_segment_size) || + (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ) || + (longhdr->std.xlp_pageaddr != lsn) || + (longhdr->std.xlp_tli != ThisTimeLineID)) + { + break; + } + XLByteToSeg(lsn, lastValidSegNo, wal_segment_size); + XLogSegNoOffsetToRecPtr(lastValidSegNo+1, 0, wal_segment_size, lsn); + } + + /* + * The last valid segment number is previous to the one that was just + * found to be invalid. + */ + XLogSegNoOffsetToRecPtr(lastValidSegNo, 0, wal_segment_size, lsn); + + elog(LOG, "last valid segment number = %lu", lastValidSegNo); + + return lsn; +} + /* * Extract timestamp from WAL record. * @@ -7205,6 +7297,27 @@ StartupXLOG(void) /* Handle interrupt signals of startup process */ HandleStartupProcInterrupts(); + /* + * Start WAL receiver without waiting for startup process to + * finish replay, so that streaming replication is established + * at the earliest. When the replication is configured to be + * synchronous this would unblock commits waiting for WAL to + * be written and/or flushed by synchronous standby. + */ + if (StandbyModeRequested && + reachedConsistency && + wal_receiver_start_condition == WAL_RCV_START_AT_CONSISTENCY && + !WalRcvStreaming()) + { + XLogRecPtr startpoint = GetLastLSN(record->xl_prev); + elog(LOG, "starting WAL receiver, startpoint %X/%X", + (uint32) (startpoint >> 32), (uint32) startpoint); + RequestXLogStreaming(ThisTimeLineID, + startpoint, + PrimaryConnInfo, + PrimarySlotName); + } + /* * Pause WAL replay, if requested by a hot-standby session via * SetRecoveryPause(). @@ -12259,12 +12372,6 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, case XLOG_FROM_ARCHIVE: case XLOG_FROM_PG_WAL: - /* - * WAL receiver must not be running when reading WAL from - * archive or pg_wal. - */ - Assert(!WalRcvStreaming()); - /* Close any old file we might have open. */ if (readFile >= 0) { diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index d5a9b568a68..a1a144d7fd6 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -88,6 +88,7 @@ int wal_receiver_status_interval; int wal_receiver_timeout; bool hot_standby_feedback; +int wal_receiver_start_condition; /* libpqwalreceiver connection */ static WalReceiverConn *wrconn = NULL; diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index e6757573010..904327d8302 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -245,10 +245,6 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, SpinLockAcquire(&walrcv->mutex); - /* It better be stopped if we try to restart it */ - Assert(walrcv->walRcvState == WALRCV_STOPPED || - walrcv->walRcvState == WALRCV_WAITING); - if (conninfo != NULL) strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); else @@ -271,12 +267,26 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, walrcv->is_temp_slot = create_temp_slot; } + /* + * We used to assert that the WAL receiver is either in WALRCV_STOPPED or + * in WALRCV_WAITING state. + * + * Such an assertion is not possible, now that this function is called by + * startup process on two occasions. One is just before starting to + * replay WAL when starting up. And the other is when it has finished + * replaying all WAL in pg_xlog directory. If the standby is starting up + * after clean shutdown, there is not much WAL to be replayed and both + * calls to this funcion can occur in quick succession. By the time the + * second request to start streaming is made, the WAL receiver can be in + * any state. We therefore cannot make any assertion on the state here. + */ + if (walrcv->walRcvState == WALRCV_STOPPED) { launch = true; walrcv->walRcvState = WALRCV_STARTING; } - else + else if (walrcv->walRcvState == WALRCV_WAITING) walrcv->walRcvState = WALRCV_RESTARTING; walrcv->startTime = now; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index de87ad6ef70..e7ce9a4e87a 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -237,6 +237,12 @@ static ConfigVariable *ProcessConfigFileInternal(GucContext context, * NOTE! Option values may not contain double quotes! */ +const struct config_enum_entry wal_rcv_start_options[] = { + {"catchup", WAL_RCV_START_AT_CATCHUP, true}, + {"consistency", WAL_RCV_START_AT_CONSISTENCY, true}, + {NULL, 0, false} +}; + static const struct config_enum_entry bytea_output_options[] = { {"escape", BYTEA_OUTPUT_ESCAPE, false}, {"hex", BYTEA_OUTPUT_HEX, false}, @@ -4784,6 +4790,17 @@ static struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"wal_receiver_start_condition", PGC_POSTMASTER, REPLICATION_STANDBY, + gettext_noop("When to start WAL receiver."), + NULL, + }, + &wal_receiver_start_condition, + WAL_RCV_START_AT_CATCHUP, + wal_rcv_start_options, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index c2d5dbee549..db5aeed74ce 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -23,10 +23,17 @@ #include "storage/spin.h" #include "utils/tuplestore.h" +typedef enum +{ + WAL_RCV_START_AT_CATCHUP, /* start a WAL receiver after replaying all WAL files */ + WAL_RCV_START_AT_CONSISTENCY /* start a WAL receiver once consistency has been reached */ +} WalRcvStartCondition; + /* user-settable parameters */ extern int wal_receiver_status_interval; extern int wal_receiver_timeout; extern bool hot_standby_feedback; +extern int wal_receiver_start_condition; /* * MAXCONNINFO: maximum size of a connection string. diff --git a/src/test/recovery/t/018_replay_lag_syncrep.pl b/src/test/recovery/t/018_replay_lag_syncrep.pl new file mode 100644 index 00000000000..e82d8a0a64b --- /dev/null +++ b/src/test/recovery/t/018_replay_lag_syncrep.pl @@ -0,0 +1,192 @@ +# Test impact of replay lag on synchronous replication. +# +# Replay lag is induced using recovery_min_apply_delay GUC. Two ways +# of breaking replication connection are covered - killing walsender +# and restarting standby. The test expects that replication +# connection is restored without being affected due to replay lag. +# This is validated by performing commits on master after replication +# connection is disconnected and checking that they finish within a +# few seconds. + +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 7; + +# Query checking sync_priority and sync_state of each standby +my $check_sql = + "SELECT application_name, sync_priority, sync_state FROM pg_stat_replication ORDER BY application_name;"; + +# Check that sync_state of a standby is expected (waiting till it is). +# If $setting is given, synchronous_standby_names is set to it and +# the configuration file is reloaded before the test. +sub test_sync_state +{ + my ($self, $expected, $msg, $setting) = @_; + + if (defined($setting)) + { + $self->safe_psql('postgres', + "ALTER SYSTEM SET synchronous_standby_names = '$setting';"); + $self->reload; + } + + ok($self->poll_query_until('postgres', $check_sql, $expected), $msg); + return; +} + +# Start a standby and check that it is registered within the WAL sender +# array of the given primary. This polls the primary's pg_stat_replication +# until the standby is confirmed as registered. +sub start_standby_and_wait +{ + my ($master, $standby) = @_; + my $master_name = $master->name; + my $standby_name = $standby->name; + my $query = + "SELECT count(1) = 1 FROM pg_stat_replication WHERE application_name = '$standby_name'"; + + $standby->start; + + print("### Waiting for standby \"$standby_name\" on \"$master_name\"\n"); + $master->poll_query_until('postgres', $query); + return; +} + +# Initialize master node +my $node_master = get_new_node('master'); +my @extra = (q[--wal-segsize], q[1]); +$node_master->init(allows_streaming => 1, extra => \@extra); +$node_master->start; +my $backup_name = 'master_backup'; + +# Setup physical replication slot for streaming replication +$node_master->safe_psql('postgres', + q[SELECT pg_create_physical_replication_slot('phys_slot', true, false);]); + +# Take backup +$node_master->backup($backup_name); + +# Create standby linking to master +my $node_standby = get_new_node('standby'); +$node_standby->init_from_backup($node_master, $backup_name, + has_streaming => 1); +$node_standby->append_conf('postgresql.conf', + q[primary_slot_name = 'phys_slot']); +# Enable debug logging in standby +$node_standby->append_conf('postgresql.conf', + q[log_min_messages = debug5]); +# Enable early WAL receiver startup +$node_standby->append_conf('postgresql.conf', + q[wal_receiver_start_condition = 'consistency']); + +start_standby_and_wait($node_master, $node_standby); + +# Make standby synchronous +test_sync_state( + $node_master, + qq(standby|1|sync), + 'standby is synchronous', + 'standby'); + +# Slow down WAL replay by inducing 10 seconds sleep before replaying +# a commit WAL record. +$node_standby->safe_psql('postgres', + 'ALTER SYSTEM set recovery_min_apply_delay TO 10000;'); +$node_standby->reload; + +# Commit some transactions on master to induce replay lag in standby. +$node_master->safe_psql('postgres', 'CREATE TABLE replay_lag_test(a int);'); +$node_master->safe_psql( + 'postgres', + 'insert into replay_lag_test values (101);'); +$node_master->safe_psql( + 'postgres', + 'insert into replay_lag_test values (102);'); +$node_master->safe_psql( + 'postgres', + 'insert into replay_lag_test values (103);'); + +# Obtain WAL sender PID and kill it. +my $walsender_pid = $node_master->safe_psql( + 'postgres', + q[select active_pid from pg_get_replication_slots() where slot_name = 'phys_slot']); + +# Kill walsender, so that the replication connection breaks. +kill 'SIGTERM', $walsender_pid; + +# The replication connection should be re-establised much earlier than +# what it takes to finish replay. Try to commit a transaction with a +# timeout of recovery_min_apply_delay + 2 seconds. The timeout should +# not be hit. +my $timed_out = 0; +$node_master->safe_psql( + 'postgres', + 'insert into replay_lag_test values (1);', + timeout => 12, + timed_out => \$timed_out); + +is($timed_out, 0, 'insert after WAL receiver restart'); + +my $replay_lag = $node_master->safe_psql( + 'postgres', + 'select flush_lsn - replay_lsn from pg_stat_replication'); +print("replay lag after WAL receiver restart: $replay_lag\n"); +ok($replay_lag > 0, 'replication resumes in spite of replay lag'); + +# Break the replication connection by restarting standby. +$node_standby->restart; + +# Like in previous test, the replication connection should be +# re-establised before pending WAL replay is finished. Try to commit +# a transaction with recovery_min_apply_delay + 2 second timeout. The +# timeout should not be hit. +$timed_out = 0; +$node_master->safe_psql( + 'postgres', + 'insert into replay_lag_test values (2);', + timeout => 12, + timed_out => \$timed_out); + +is($timed_out, 0, 'insert after standby restart'); +$replay_lag = $node_master->safe_psql( + 'postgres', + 'select flush_lsn - replay_lsn from pg_stat_replication'); +print("replay lag after standby restart: $replay_lag\n"); +ok($replay_lag > 0, 'replication starts in spite of replay lag'); + +# Reset the delay so that the replay process is no longer slowed down. +$node_standby->safe_psql('postgres', 'ALTER SYSTEM set recovery_min_apply_delay to 0;'); +$node_standby->reload; + +# Switch to a new WAL file and see if things work well. +$node_master->safe_psql( + 'postgres', + 'select pg_switch_wal();'); + +# Transactions should work fine on master. +$timed_out = 0; +$node_master->safe_psql( + 'postgres', + 'insert into replay_lag_test values (3);', + timeout => 1, + timed_out => \$timed_out); + +# Wait for standby to replay all WAL. +$node_master->wait_for_catchup('standby', 'replay', + $node_master->lsn('insert')); + +# Standby should also have identical content. +my $count_sql = q[select count(*) from replay_lag_test;]; +my $expected = q[6]; +ok($node_standby->poll_query_until('postgres', $count_sql, $expected), 'standby query'); + +# Test that promotion followed by query works. +$node_standby->promote; +$node_master->stop; +$node_standby->safe_psql('postgres', 'insert into replay_lag_test values (4);'); + +$expected = q[7]; +ok($node_standby->poll_query_until('postgres', $count_sql, $expected), + 'standby query after promotion'); -- 2.24.2 (Apple Git-127)