diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 2ab7d80..ef6433f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -802,6 +802,7 @@ static XLogSource readSource = 0; /* XLOG_FROM_* code */ */ static XLogSource currentSource = 0; /* XLOG_FROM_* code */ static bool lastSourceFailed = false; +static bool stopOnError = false; typedef struct XLogPageReadPrivate { @@ -3971,6 +3972,49 @@ RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr RedoRecPtr, XLogRecPtr endptr) } /* + * Find latest WAL LSN + */ +static XLogRecPtr +GetLastLSN(XLogRecPtr lsn) +{ + XLogReaderState *xlogreader; + char *errormsg; + XLogPageReadPrivate private; + MemSet(&private, 0, sizeof(XLogPageReadPrivate)); + + xlogreader = XLogReaderAllocate(wal_segment_size, &XLogPageRead, &private); + + stopOnError = true; + while (XLogReadRecord(xlogreader, lsn, &errormsg) != NULL) + { + lsn = InvalidXLogRecPtr; + } + stopOnError = false; + lsn = xlogreader->EndRecPtr; + XLogReaderFree(xlogreader); + + return lsn; +} + +/* + * Launch WalReceiver starting from last LSN if not started yet. + */ +static void +StartWalRcv(XLogRecPtr currLsn) +{ + if (!WalRcvStreaming() && PrimaryConnInfo && strcmp(PrimaryConnInfo, "") != 0) + { + XLogRecPtr lastLSN = GetLastLSN(currLsn); + if (lastLSN != InvalidXLogRecPtr) + { + curFileTLI = ThisTimeLineID; + RequestXLogStreaming(ThisTimeLineID, lastLSN, PrimaryConnInfo, + PrimarySlotName); + } + } +} + +/* * Remove WAL files that are not part of the given timeline's history. * * This is called during recovery, whenever we switch to follow a new @@ -6004,6 +6048,12 @@ recoveryApplyDelay(XLogReaderState *record) if (secs <= 0 && microsecs <= 0) return false; + /* + * Start WAL receiver if not started yet, to avoid WALs overflow at primary node + * or large gap between primary and replica when apply delay is specified. + */ + StartWalRcv(record->EndRecPtr); + while (true) { ResetLatch(&XLogCtl->recoveryWakeupLatch); @@ -11821,6 +11871,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, return false; /* + * If WAL receiver was altery started because of apply delay, + * thre restart it. + */ + if (WalRcvStreaming()) + ShutdownWalRcv(); + + /* * If primary_conninfo is set, launch walreceiver to try * to stream the missing WAL. * @@ -11990,6 +12047,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, if (readFile >= 0) return true; /* success! */ + if (stopOnError) + return false; + /* * Nope, not found in archive or pg_wal. */