From edd12ca55a7454ccaaba5b7bf7d4a34a15ef4707 Mon Sep 17 00:00:00 2001 From: Wu Hao Date: Fri, 17 Jan 2020 18:14:43 +0530 Subject: [PATCH v1 2/3] Start WAL receiver before startup process replays existing WAL If WAL receiver is started only after startup process finishes replaying WAL already available in pg_wal, synchornous replication is impacted adversly. Consider a temporary network outage causing streaming replication connection to break. This leads to exit of WAL receiver process. If the startup process has fallen behind, it may take a long time to finish replaying WAL and then start walreceiver again to re-establish streaming replication. Commits on master will have to wait all this while for the standby to flush WAL upto commit LSN. This experience can be alleviated if replication connection is re-established as soon as it is found to be disconnected. The patch attempts to make this happen by starting WAL receiver from postmaster ServerLoop as well as startup process, even before it WAL replay begins. The start point to request streaming from is remembered in pg_control. Before creating a new WAL segment file, WAL receiver records the new WAL segment number in pg_control. If the WAL receiver process exits and must restart, the recorded segment number is used to generate a start point, that is the first offset in the segment file to re-establish streaming replication. Alternatives we thought of (but did not implement) for persisting the starting point: (1) postgresql.auto.conf file, similar to how primary_conninfo is remembered. This option requires creating a new GUC that represents the starting point. Start point is never set by a user, so using a GUC to represent it does not seem appropriate. (2) introduce a new flat file. This incurs the overhead to maintain an additional flat file. Co-authored-by: Asim R P --- src/backend/access/transam/xlog.c | 28 +++++++++++++ src/backend/replication/walreceiver.c | 67 ++++++++++++++++++++++++++++++ src/backend/replication/walreceiverfuncs.c | 20 ++++++--- src/bin/pg_controldata/pg_controldata.c | 4 ++ src/include/catalog/pg_control.h | 7 ++++ 5 files changed, 121 insertions(+), 5 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 7f4f784c0e..a87bd78f96 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6973,6 +6973,34 @@ StartupXLOG(void) } } + /* + * Start WAL receiver without waiting for startup process to finish + * replay, so that streaming replication is established at the + * earliest. When the replication is configured to be synchronous + * this would unblock commits waiting for WAL to be written and/or + * flushed by synchronous standby. + */ + if (StandbyModeRequested) + { + XLogRecPtr startpoint; + XLogSegNo startseg; + TimeLineID startpointTLI; + LWLockAcquire(ControlFileLock, LW_SHARED); + startseg = ControlFile->lastFlushedSeg; + startpointTLI = ControlFile->lastFlushedSegTLI; + LWLockRelease(ControlFileLock); + if (startpointTLI > 0) + { + elog(LOG, "found last flushed segment %lu on time line %d, starting WAL receiver", + startseg, startpointTLI); + XLogSegNoOffsetToRecPtr(startseg, 0, wal_segment_size, startpoint); + RequestXLogStreaming(startpointTLI, + startpoint, + PrimaryConnInfo, + PrimarySlotName); + } + } + /* Initialize resource managers */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index a5e85d32f3..c862b65cae 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -50,6 +50,7 @@ #include "access/transam.h" #include "access/xlog_internal.h" #include "catalog/pg_authid.h" +#include "catalog/pg_control.h" #include "catalog/pg_type.h" #include "common/ip.h" #include "funcapi.h" @@ -82,6 +83,8 @@ bool hot_standby_feedback; static WalReceiverConn *wrconn = NULL; WalReceiverFunctionsType *WalReceiverFunctions = NULL; +static ControlFileData *ControlFile = NULL; + #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ /* @@ -163,6 +166,45 @@ ProcessWalRcvInterrupts(void) } +/* + * Persist startpoint to pg_control file. This is used to start replication + * without waiting for startup process to let us know where to start streaming + * from. + */ +static void +SaveStartPoint(XLogRecPtr startpoint, TimeLineID startpointTLI) +{ + XLogSegNo oldseg, startseg; + TimeLineID oldTLI; + + XLByteToSeg(startpoint, startseg, wal_segment_size); + + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + +#ifdef USE_ASSERT_CHECKING + /* + * On a given timeline, the WAL segment to start streaming from should + * never move backwards. + */ + if (ControlFile->lastFlushedSegTLI == startpointTLI) + Assert(ControlFile->lastFlushedSeg <= startseg); +#endif + + oldseg = ControlFile->lastFlushedSeg; + oldTLI = ControlFile->lastFlushedSegTLI; + if (oldseg < startseg || oldTLI != startpointTLI) + { + ControlFile->lastFlushedSeg = startseg; + ControlFile->lastFlushedSegTLI = startpointTLI; + UpdateControlFile(); + elog(DEBUG3, + "lastFlushedSeg (seg, TLI) old: (%lu, %u), new: (%lu, %u)", + oldseg, oldTLI, startseg, startpointTLI); + } + + LWLockRelease(ControlFileLock); +} + /* Main entry point for walreceiver process */ void WalReceiverMain(void) @@ -304,6 +346,10 @@ WalReceiverMain(void) if (sender_host) pfree(sender_host); + bool found; + ControlFile = ShmemInitStruct("Control File", sizeof(ControlFileData), &found); + Assert(found); + first_stream = true; for (;;) { @@ -1055,6 +1101,27 @@ XLogWalRcvFlush(bool dying) /* Also let the master know that we made some progress */ if (!dying) { + /* + * When a WAL segment file is completely filled, + * LogstreamResult.Flush points to the beginning of the new WAL + * segment file that will be created shortly. Before sending a + * reply with a LSN from the new WAL segment for the first time, + * remember the LSN in pg_control. The LSN is used as the + * startpoint to start streaming again if the WAL receiver process + * exits and starts again. + * + * It is important to update the LSN's segment number in + * pg_control before including it in a replay back to the WAL + * sender. Once WAL sender receives the flush LSN from standby + * reply, any older WAL segments that do not contain the flush LSN + * may be cleaned up. If the WAL receiver dies after sending a + * reply but before updating pg_control, it is possible that the + * starting segment saved in pg_control is no longer available on + * master when it attempts to resume streaming. + */ + if (XLogSegmentOffset(LogstreamResult.Flush, wal_segment_size) == 0) + SaveStartPoint(LogstreamResult.Flush, ThisTimeLineID); + XLogWalRcvSendReply(false, false); XLogWalRcvSendHSFeedback(false); } diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 89c903e45a..955b8fcf83 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -239,10 +239,6 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, SpinLockAcquire(&walrcv->mutex); - /* It better be stopped if we try to restart it */ - Assert(walrcv->walRcvState == WALRCV_STOPPED || - walrcv->walRcvState == WALRCV_WAITING); - if (conninfo != NULL) strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); else @@ -253,12 +249,26 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, else walrcv->slotname[0] = '\0'; + /* + * We used to assert that the WAL receiver is either in WALRCV_STOPPED or + * in WALRCV_WAITING state. + * + * Such an assertion is not possible, now that this function is called by + * startup process on two occasions. One is just before starting to + * replay WAL when starting up. And the other is when it has finished + * replaying all WAL in pg_xlog directory. If the standby is starting up + * after clean shutdown, there is not much WAL to be replayed and both + * calls to this funcion can occur in quick succession. By the time the + * second request to start streaming is made, the WAL receiver can be in + * any state. We therefore cannot make any assertion on the state here. + */ + if (walrcv->walRcvState == WALRCV_STOPPED) { launch = true; walrcv->walRcvState = WALRCV_STARTING; } - else + else if (walrcv->walRcvState == WALRCV_WAITING) walrcv->walRcvState = WALRCV_RESTARTING; walrcv->startTime = now; diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index 19e21ab491..f98f36ffe5 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -234,6 +234,10 @@ main(int argc, char *argv[]) dbState(ControlFile->state)); printf(_("pg_control last modified: %s\n"), pgctime_str); + printf(_("Latest flushed WAL segment number: %lu\n"), + ControlFile->lastFlushedSeg); + printf(_("Latest flushed TimeLineID: %u\n"), + ControlFile->lastFlushedSegTLI); printf(_("Latest checkpoint location: %X/%X\n"), (uint32) (ControlFile->checkPoint >> 32), (uint32) ControlFile->checkPoint); diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index de5670e538..27260bbea5 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -143,6 +143,11 @@ typedef struct ControlFileData * to disk, we mustn't start up until we reach X again. Zero when not * doing archive recovery. * + * lastFlushedSeg is the WAL segment number of the most recently flushed + * WAL file by walreceiver. It is updated by walreceiver when a received + * WAL record falls on a new WAL segment file. This is used as the start + * point to resume WAL streaming if it is stopped. + * * backupStartPoint is the redo pointer of the backup start checkpoint, if * we are recovering from an online backup and haven't reached the end of * backup yet. It is reset to zero when the end of backup is reached, and @@ -165,6 +170,8 @@ typedef struct ControlFileData */ XLogRecPtr minRecoveryPoint; TimeLineID minRecoveryPointTLI; + XLogSegNo lastFlushedSeg; + TimeLineID lastFlushedSegTLI; XLogRecPtr backupStartPoint; XLogRecPtr backupEndPoint; bool backupEndRequired; -- 2.14.3 (Apple Git-98)