From 7ad6f7c0127c1d6cdbb7ce6ab55a28d3d07933fd Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Thu, 1 Sep 2016 10:16:55 +0800 Subject: [PATCH 06/10] Follow timeline switches in logical decoding When decoding from a logical slot, it's necessary for xlog reading to be able to read xlog from historical (i.e. not current) timelines. Otherwise decoding fails after failover to a physical replica because the oldest still-needed archives are in the historical timeline. Supporting logical decoding timeline following is a pre-requisite for logical decoding on physical standby servers. It also makes it possible to promote a replica with logical slots to a master and replay from those slots, allowing logical decoding applications to follow physical failover. Logical slots cannot actually be created on a replica without use of the low-level C slot management APIs so this is mostly foundation work for subsequent changes to enable logical decoding on standbys. Tests are included to exercise the functionality using a cold disk-level copy of the master that's started up as a replica with slots intact, but the intended use of the functionality is with logical decoding on a standby. Note that an earlier version of logical decoding timeline following was committed to 9.5 as 24c5f1a103ce, 3a3b309041b0, 82c83b337202, and f07d18b6e94d. It was then reverted by c1543a81a7a8 just after 9.5 feature freeze when issues were discovered too late to safely fix them in the 9.5 release cycle. The prior approach failed to consider that a record could be split across pages that are on different segments, where the new segment contains the start of a new timeline. In that case the old segment might be missing or renamed with a .partial suffix. This patch reworks the logic to be page-based and in the process simplify how the last timeline for a segment is looked up. --- src/backend/access/transam/xlogutils.c | 200 +++++++++++++++++++-- src/backend/replication/logical/logicalfuncs.c | 7 +- src/backend/replication/walsender.c | 11 +- src/include/access/xlogreader.h | 16 ++ src/include/access/xlogutils.h | 3 + src/test/recovery/Makefile | 2 + .../recovery/t/009_logical_decoding_timelines.pl | 130 ++++++++++++++ 7 files changed, 347 insertions(+), 22 deletions(-) create mode 100644 src/test/recovery/t/009_logical_decoding_timelines.pl diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 51a8e8d..ab15cf3 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -19,6 +19,7 @@ #include +#include "access/timeline.h" #include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" @@ -660,6 +661,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) /* state maintained across calls */ static int sendFile = -1; static XLogSegNo sendSegNo = 0; + static TimeLineID sendTLI = 0; static uint32 sendOff = 0; p = buf; @@ -675,7 +677,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) startoff = recptr % XLogSegSize; /* Do we need to switch to a different xlog segment? */ - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo)) + if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || + sendTLI != tli) { char path[MAXPGPATH]; @@ -702,6 +705,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) path))); } sendOff = 0; + sendTLI = tli; } /* Need to seek in the file? */ @@ -750,6 +754,129 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) } /* + * Determine which timeline to read an xlog page from and set the + * XLogReaderState's currTLI to that timeline ID. + * + * It's necessary to care about timelines in xlogreader and logical decoding + * when we might be reading xlog generated prior to a promotion, either if + * we're currently a standby in recovery or if we're a promoted master reading + * xlogs generated by the old master before our promotion. Notably, logical + * decoding on a standby needs to be able to replay any remaining pending data + * from the old timeline when the standby or one of its upstreams being + * promoted. + * + * wantPage must be set to the start address of the page to read and + * wantLength to the amount of the page that will be read, up to + * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ. + * + * We switch to an xlog segment from the new timeline eagerly when on a + * historical timeline, as soon as we reach the start of the xlog segment + * containing the timeline switch. The server copied the segment to the new + * timeline so all the data up to the switch point is the same, but there's no + * guarantee the old segment will still exist. It may have been deleted or + * renamed with a .partial suffix so we can't necessarily keep reading from + * the old TLI even though tliSwitchPoint says it's OK. + * + * We can't just check the timeline when we read a page on a different segment + * to the last page. We could've received a timeline switch from a cascading + * upstream, so the current segment ends and we have to switch to a new one. + * Even in the middle of reading a page we could have to dump the cached page + * and switch to a new TLI. + * + * Because of this, callers MAY NOT assume that currTLI is the timeline that + * will be in a page's xlp_tli; the page may begin on an older timeline or we + * might be reading from historical timeline data on a segment that's been + * copied to a new timeline. + * + * The caller must also make sure it doesn't read past the current replay + * position if executing in recovery, so it doesn't fail to notice that the + * current timeline became historical. + */ +void +XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) +{ + const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff; + + Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); + Assert(wantLength <= XLOG_BLCKSZ); + Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ); + + /* + * If the desired page is currently read in and valid, we have nothing to do. + * + * The caller should've ensured that it didn't previously advance readOff + * past the valid limit of this timeline, so it doesn't matter if the current + * TLI has since become historical. + */ + if (lastReadPage == wantPage && + state->readLen != 0 && + lastReadPage + state->readLen >= wantPage + Min(wantLength,XLOG_BLCKSZ-1)) + return; + + /* + * If we're reading from the current timeline, it hasn't become historical + * and the page we're reading is after the last page read, we can again + * just carry on. (Seeking backwards requires a check to make sure the older + * page isn't on a prior timeline). + */ + if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage) + { + Assert(state->currTLIValidUntil == InvalidXLogRecPtr); + return; + } + + /* + * If we're just reading pages from a previously validated historical + * timeline and the timeline we're reading from is valid until the + * end of the current segment we can just keep reading. + */ + if (state->currTLIValidUntil != InvalidXLogRecPtr && + state->currTLI != ThisTimeLineID && + state->currTLI != 0 && + (wantPage + wantLength) / XLogSegSize < state->currTLIValidUntil / XLogSegSize) + return; + + /* + * If we reach this point we're either looking up a page for random access, + * the current timeline just became historical, or we're reading from a new + * segment containing a timeline switch. In all cases we need to determine + * the newest timeline on the segment. + * + * If it's the current timeline we can just keep reading from here unless + * we detect a timeline switch that makes the current timeline historical. + * If it's a historical timeline we can read all the segment on the newest + * timeline because it contains all the old timelines' data too. So only + * one switch check is required. + */ + { + /* + * We need to re-read the timeline history in case it's been changed + * by a promotion or replay from a cascaded replica. + */ + List *timelineHistory = readTimeLineHistory(ThisTimeLineID); + + XLogRecPtr endOfSegment = (((wantPage / XLogSegSize) + 1) * XLogSegSize) - 1; + + Assert(wantPage / XLogSegSize == endOfSegment / XLogSegSize); + + /* Find the timeline of the last LSN on the segment containing wantPage. */ + state->currTLI = tliOfPointInHistory(endOfSegment, timelineHistory); + state->currTLIValidUntil = tliSwitchPoint(state->currTLI, timelineHistory, + &state->nextTLI); + + Assert(state->currTLIValidUntil == InvalidXLogRecPtr || + wantPage + wantLength < state->currTLIValidUntil); + + list_free_deep(timelineHistory); + + elog(DEBUG3, "switched to timeline %u valid until %X/%X", + state->currTLI, + (uint32)(state->currTLIValidUntil >> 32), + (uint32)(state->currTLIValidUntil)); + } +} + +/* * read_page callback for reading local xlog files * * Public because it would likely be very helpful for someone writing another @@ -770,28 +897,71 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int count; loc = targetPagePtr + reqLen; + + /* Make sure enough xlog is available... */ while (1) { /* - * TODO: we're going to have to do something more intelligent about - * timelines on standbys. Use readTimeLineHistory() and - * tliOfPointInHistory() to get the proper LSN? For now we'll catch - * that case earlier, but the code and TODO is left in here for when - * that changes. + * Check which timeline to get the record from. + * + * We have to do it each time through the loop because if we're in + * recovery as a cascading standby, the current timeline might've + * become historical. We can't rely on RecoveryInProgress() because + * in a standby configuration like + * + * A => B => C + * + * if we're a logical decoding on C, and B gets promoted, our timeline + * will change while we remain in recovery. */ - if (!RecoveryInProgress()) + XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + + if (state->currTLI == ThisTimeLineID) { - *pageTLI = ThisTimeLineID; - read_upto = GetFlushRecPtr(); + /* + * We're reading from the current timeline so we might have to + * wait for the desired record to be generated (or, for a standby, + * received & replayed) + */ + if (!RecoveryInProgress()) + { + *pageTLI = ThisTimeLineID; + read_upto = GetFlushRecPtr(); + } + else + read_upto = GetXLogReplayRecPtr(pageTLI); + + if (loc <= read_upto) + break; + + CHECK_FOR_INTERRUPTS(); + pg_usleep(1000L); } else - read_upto = GetXLogReplayRecPtr(pageTLI); - - if (loc <= read_upto) + { + /* + * We're on a historical timeline, so limit reading to the switch + * point where we moved to the next timeline. + * + * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know + * about the new timeline, so we must've received past the end of + * it. + */ + read_upto = state->currTLIValidUntil; + + /* + * Setting pageTLI to our wanted record's TLI is slightly wrong; + * the page might begin on an older timeline if it contains a + * timeline switch, since its xlog segment will have been copied + * from the prior timeline. This is pretty harmless though, as + * nothing cares so long as the timeline doesn't go backwards. We + * should read the page header instead; FIXME someday. + */ + *pageTLI = state->currTLI; + + /* No need to wait on a historical timeline */ break; - - CHECK_FOR_INTERRUPTS(); - pg_usleep(1000L); + } } if (targetPagePtr + XLOG_BLCKSZ <= read_upto) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 318726e..a8f7b76 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -234,13 +234,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin rsinfo->setResult = p->tupstore; rsinfo->setDesc = p->tupdesc; + ReplicationSlotAcquire(NameStr(*name)); + /* compute the current end-of-wal */ if (!RecoveryInProgress()) end_of_wal = GetFlushRecPtr(); else - end_of_wal = GetXLogReplayRecPtr(NULL); - - ReplicationSlotAcquire(NameStr(*name)); + end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); PG_TRY(); { @@ -279,6 +279,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* invalidate non-timetravel entries */ InvalidateSystemCaches(); + /* Decode until we run out of records */ while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) || (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal)) { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 5cdb8a0..acb3370 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -47,6 +47,7 @@ #include "access/transam.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogutils.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" @@ -760,6 +761,12 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req XLogRecPtr flushptr; int count; + XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + sendTimeLineIsHistoric = state->currTLI == ThisTimeLineID; + sendTimeLine = state->currTLI; + sendTimeLineValidUpto = state->currTLIValidUntil; + sendTimeLineNextTLI = state->nextTLI; + /* make sure we have enough WAL available */ flushptr = WalSndWaitForWal(targetPagePtr + reqLen); @@ -992,10 +999,6 @@ StartLogicalReplication(StartReplicationCmd *cmd) pq_endmessage(&buf); pq_flush(); - /* setup state for XLogReadPage */ - sendTimeLineIsHistoric = false; - sendTimeLine = ThisTimeLineID; - /* * Initialize position to the last ack'ed one, then the xlog records begin * to be shipped from that position. diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index deaa7f5..8f96728 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -160,6 +160,22 @@ struct XLogReaderState /* beginning of the WAL record being read. */ XLogRecPtr currRecPtr; + /* timeline to read it from, 0 if a lookup is required */ + TimeLineID currTLI; + /* + * Safe point to read to in currTLI if current TLI is historical + * (tliSwitchPoint) or InvalidXLogRecPtr if on current timeline. + * + * Actually set to the start of the segment containing the timeline + * switch that ends currTLI's validity, not the LSN of the switch + * its self, since we can't assume the old segment will be present. + */ + XLogRecPtr currTLIValidUntil; + /* + * If currTLI is not the most recent known timeline, the next timeline to + * read from when currTLIValidUntil is reached. + */ + TimeLineID nextTLI; /* Buffer for current ReadRecord result (expandable) */ char *readRecordBuf; diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index d027ea1..f0ee352 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -52,4 +52,7 @@ extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI); +extern void XLogReadDetermineTimeline(XLogReaderState *state, + XLogRecPtr wantPage, uint32 wantLength); + #endif diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile index a847952..d2ff1e9 100644 --- a/src/test/recovery/Makefile +++ b/src/test/recovery/Makefile @@ -9,6 +9,8 @@ # #------------------------------------------------------------------------- +EXTRA_INSTALL=contrib/test_decoding + subdir = src/test/recovery top_builddir = ../../.. include $(top_builddir)/src/Makefile.global diff --git a/src/test/recovery/t/009_logical_decoding_timelines.pl b/src/test/recovery/t/009_logical_decoding_timelines.pl new file mode 100644 index 0000000..09830dc --- /dev/null +++ b/src/test/recovery/t/009_logical_decoding_timelines.pl @@ -0,0 +1,130 @@ +# Demonstrate that logical can follow timeline switches. +# +# Logical replication slots can follow timeline switches but it's +# normally not possible to have a logical slot on a replica where +# promotion and a timeline switch can occur. The only ways +# we can create that circumstance are: +# +# * By doing a filesystem-level copy of the DB, since pg_basebackup +# excludes pg_replslot but we can copy it directly; or +# +# * by creating a slot directly at the C level on the replica and +# advancing it as we go using the low level APIs. It can't be done +# from SQL since logical decoding isn't allowed on replicas. +# +# This module uses the first approach to show that timeline following +# on a logical slot works. +# +use strict; +use warnings; + +use PostgresNode; +use TestLib; +use Test::More tests => 7; +use RecursiveCopy; +use File::Copy; +use IPC::Run (); +use Scalar::Util qw(blessed); + +my ($stdout, $stderr, $ret); + +# Initialize master node +my $node_master = get_new_node('master'); +$node_master->init(allows_streaming => 1, has_archiving => 1); +$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n"); +$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n"); +$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n"); +$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n"); +$node_master->dump_info; +$node_master->start; + +diag "Testing logical timeline following with a filesystem-level copy"; + +$node_master->safe_psql('postgres', +"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');" +); +$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);"); +$node_master->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('beforebb');"); +$node_master->safe_psql('postgres', 'CHECKPOINT;'); + +my $backup_name = 'b1'; +$node_master->backup_fs_hot($backup_name); + +my $node_replica = get_new_node('replica'); +$node_replica->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_replica->start; + +$node_master->safe_psql('postgres', +"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');" +); +$node_master->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('afterbb');"); +$node_master->safe_psql('postgres', 'CHECKPOINT;'); + +# Verify that only the before base_backup slot is on the replica +$stdout = $node_replica->safe_psql('postgres', + 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name'); +is($stdout, 'before_basebackup', + 'Expected to find only slot before_basebackup on replica'); + +# Boom, crash +$node_master->stop('immediate'); + +$node_replica->promote; +$node_replica->poll_query_until('postgres', + "SELECT NOT pg_is_in_recovery();"); + +$node_replica->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('after failover');"); + +# Shouldn't be able to read from slot created after base backup +($ret, $stdout, $stderr) = $node_replica->psql('postgres', +"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');" +); +is($ret, 3, 'replaying from after_basebackup slot fails'); +like( + $stderr, + qr/replication slot "after_basebackup" does not exist/, + 'after_basebackup slot missing'); + +# Should be able to read from slot created before base backup +($ret, $stdout, $stderr) = $node_replica->psql( + 'postgres', +"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');", + timeout => 30); +is($ret, 0, 'replay from slot before_basebackup succeeds'); + +my $final_expected_output_bb = q(BEGIN +table public.decoding: INSERT: blah[text]:'beforebb' +COMMIT +BEGIN +table public.decoding: INSERT: blah[text]:'afterbb' +COMMIT +BEGIN +table public.decoding: INSERT: blah[text]:'after failover' +COMMIT); +is($stdout, $final_expected_output_bb, 'decoded expected data from slot before_basebackup'); +is($stderr, '', 'replay from slot before_basebackup produces no stderr'); + +# So far we've peeked the slots, so when we fetch the same info over +# pg_recvlogical we should get complete results. First, find out the commit lsn +# of the last transaction. There's no max(pg_lsn), so: + +my $endpos = $node_replica->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL) ORDER BY location DESC LIMIT 1;"); + +# now use the walsender protocol to peek the slot changes and make sure we see +# the same results. + +$stdout = $node_replica->pg_recvlogical_upto('postgres', 'before_basebackup', + $endpos, 30, 'include-xids' => '0', 'skip-empty-xacts' => '1'); + +# walsender likes to add a newline +chomp($stdout); +is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup'); + +# We don't need the standby anymore +$node_replica->teardown_node(); -- 2.5.5